blob: fc6522dcbb822846be8b6cb198066e844d3148f4 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import glob
import itertools
import mmap
import os
import unittest
from parameterized import parameterized
ROOT_FOLDER = os.path.realpath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)
)
class TestProjectStructure(unittest.TestCase):
def test_reference_to_providers_from_core(self):
for filename in glob.glob(f"{ROOT_FOLDER}/example_dags/**/*.py", recursive=True):
self.assert_file_not_contains(filename, "providers")
def test_deprecated_packages(self):
path_pattern = f"{ROOT_FOLDER}/airflow/contrib/**/*.py"
for filename in glob.glob(path_pattern, recursive=True):
if filename.endswith("/__init__.py"):
self.assert_file_contains(filename, "This package is deprecated.")
else:
self.assert_file_contains(filename, "This module is deprecated.")
def assert_file_not_contains(self, filename: str, pattern: str):
with open(filename, 'rb', 0) as file, mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) as content:
if content.find(bytes(pattern, 'utf-8')) != -1:
self.fail(f"File {filename} not contains pattern - {pattern}")
def assert_file_contains(self, filename: str, pattern: str):
with open(filename, 'rb', 0) as file, mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) as content:
if content.find(bytes(pattern, 'utf-8')) == -1:
self.fail(f"File {filename} contains illegal pattern - {pattern}")
def test_providers_modules_should_have_tests(self):
"""
Assert every module in /airflow/providers has a corresponding test_ file in tests/airflow/providers.
"""
# TODO: Should we extend this test to cover other directories?
expected_test_files = glob.glob(f"{ROOT_FOLDER}/airflow/providers/**/*.py", recursive=True)
# Make path relative
expected_test_files = (os.path.relpath(f, ROOT_FOLDER) for f in expected_test_files)
# Exclude example_dags
expected_test_files = (f for f in expected_test_files if "/example_dags/" not in f)
# Exclude __init__.py
expected_test_files = (f for f in expected_test_files if not f.endswith("__init__.py"))
# Change airflow/ to tests/
expected_test_files = (
f'tests/{f.partition("/")[2]}'
for f in expected_test_files if not f.endswith("__init__.py")
)
# Add test_ prefix to filename
expected_test_files = (
f'{f.rpartition("/")[0]}/test_{f.rpartition("/")[2]}'
for f in expected_test_files if not f.endswith("__init__.py")
)
current_test_files = glob.glob(f"{ROOT_FOLDER}/tests/providers/**/*.py", recursive=True)
# Make path relative
current_test_files = (os.path.relpath(f, ROOT_FOLDER) for f in current_test_files)
# Exclude __init__.py
current_test_files = (f for f in current_test_files if not f.endswith("__init__.py"))
expected_test_files = set(expected_test_files)
current_test_files = set(current_test_files)
missing_tests_files = expected_test_files - expected_test_files.intersection(current_test_files)
self.assertEqual(set(), missing_tests_files)
class TestGoogleProviderProjectStructure(unittest.TestCase):
MISSING_EXAMPLE_DAGS = {
('cloud', 'adls_to_gcs'),
('cloud', 'sql_to_gcs'),
('cloud', 'bigquery_to_mysql'),
('cloud', 'cassandra_to_gcs'),
('cloud', 'mysql_to_gcs'),
('cloud', 'mssql_to_gcs'),
('ads', 'ads_to_gcs'),
}
def test_example_dags(self):
operators_modules = itertools.chain(
*[self.find_resource_files(resource_type=d) for d in ["operators", "sensors", "transfers"]]
)
example_dags_files = self.find_resource_files(resource_type="example_dags")
# Generate tuple of department and service e.g. ('marketing_platform', 'display_video')
operator_sets = [
(f.split("/")[-3], f.split("/")[-1].rsplit(".")[0]) for f in operators_modules
]
example_sets = [
(f.split("/")[-3], f.split("/")[-1].rsplit(".")[0].replace("example_", "", 1))
for f in example_dags_files
]
def has_example_dag(operator_set):
for e in example_sets:
if e[0] != operator_set[0]:
continue
if e[1].startswith(operator_set[1]):
return True
return False
with self.subTest("Detect missing example dags"):
missing_example = set(s for s in operator_sets if not has_example_dag(s))
missing_example -= self.MISSING_EXAMPLE_DAGS
self.assertEqual(set(), missing_example)
with self.subTest("Keep update missing example dags list"):
new_example_dag = set(example_sets).intersection(set(self.MISSING_EXAMPLE_DAGS))
if new_example_dag:
new_example_dag_text = '\n'.join(str(f) for f in new_example_dag)
self.fail(
"You've added a example dag currently listed as missing:\n"
f"{new_example_dag_text}"
"\n"
"Thank you very much.\n"
"Can you remove it from the list of missing example, please?"
)
with self.subTest("Revmoe extra elements"):
extra_example_dags = set(self.MISSING_EXAMPLE_DAGS) - set(operator_sets)
if extra_example_dags:
new_example_dag_text = '\n'.join(str(f) for f in extra_example_dags)
self.fail(
"You've added a example dag currently listed as missing:\n"
f"{new_example_dag_text}"
"\n"
"Thank you very much.\n"
"Can you remove it from the list of missing example, please?"
)
@parameterized.expand(
[
(resource_type, suffix,)
for suffix in ["_system.py", "_system_helper.py"]
for resource_type in ["operators", "sensors", "tranfers"]
]
)
def test_detect_invalid_system_tests(self, resource_type, filename_suffix):
operators_tests = self.find_resource_files(top_level_directory="tests", resource_type=resource_type)
operators_files = self.find_resource_files(top_level_directory="airflow", resource_type=resource_type)
files = {f for f in operators_tests if f.endswith(filename_suffix)}
expected_files = (f"tests/{f[8:]}" for f in operators_files)
expected_files = (
f.replace(".py", filename_suffix).replace("/test_", "/") for f in expected_files
)
expected_files = {
f'{f.rpartition("/")[0]}/test_{f.rpartition("/")[2]}' for f in expected_files
}
self.assertEqual(set(), files - expected_files)
@staticmethod
def find_resource_files(
top_level_directory: str = "airflow",
department: str = "*",
resource_type: str = "*",
service: str = "*"
):
python_files = glob.glob(
f"{ROOT_FOLDER}/{top_level_directory}/providers/google/{department}/{resource_type}/{service}.py"
)
# Make path relative
resource_files = (os.path.relpath(f, ROOT_FOLDER) for f in python_files)
# Exclude __init__.py and pycache
resource_files = (f for f in resource_files if not f.endswith("__init__.py"))
return resource_files
class TestOperatorsHooks(unittest.TestCase):
def test_no_illegal_suffixes(self):
illegal_suffixes = ["_operator.py", "_hook.py", "_sensor.py"]
files = itertools.chain(*[
glob.glob(f"{ROOT_FOLDER}/{part}/providers/**/{resource_type}/*.py", recursive=True)
for resource_type in ["operators", "hooks", "sensors", "example_dags"]
for part in ["airflow", "tests"]
])
invalid_files = [
f
for f in files
if any(f.endswith(suffix) for suffix in illegal_suffixes)
]
self.assertEqual([], invalid_files)