blob: 1f1dd675ace3dcffdce8025bc7194adf35183b7b [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import importlib
import inspect
import json
import os
import pathlib
import platform
import sys
import textwrap
from collections import Counter
from enum import Enum
from itertools import chain, product
from typing import Any, Iterable
import jsonschema
import yaml
from jsonpath_ng.ext import parse
from rich.console import Console
from tabulate import tabulate
from airflow.cli.commands.info_command import Architecture
# Those are deprecated modules that contain removed Hooks/Sensors/Operators that we left in the code
# so that users can get a very specific error message when they try to use them.
EXCLUDED_MODULES = [
"airflow.providers.apache.hdfs.sensors.hdfs",
"airflow.providers.apache.hdfs.hooks.hdfs",
]
try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader # type: ignore
if __name__ != "__main__":
raise Exception(
"This file is intended to be executed as an executable program. You cannot use it as a module."
)
ROOT_DIR = pathlib.Path(__file__).resolve().parents[2]
DOCS_DIR = ROOT_DIR.joinpath("docs")
PROVIDER_DATA_SCHEMA_PATH = ROOT_DIR.joinpath("airflow", "provider.yaml.schema.json")
PROVIDER_ISSUE_TEMPLATE_PATH = ROOT_DIR.joinpath(
".github", "ISSUE_TEMPLATE", "airflow_providers_bug_report.yml"
)
CORE_INTEGRATIONS = ["SQL", "Local"]
errors: list[str] = []
console = Console(width=400, color_system="standard")
suspended_providers: set[str] = set()
suspended_logos: set[str] = set()
suspended_integrations: set[str] = set()
def _filepath_to_module(filepath: pathlib.Path) -> str:
p = filepath.resolve().relative_to(ROOT_DIR).as_posix()
if p.endswith(".py"):
p = p[:-3]
return p.replace("/", ".")
def _load_schema() -> dict[str, Any]:
with PROVIDER_DATA_SCHEMA_PATH.open() as schema_file:
content = json.load(schema_file)
return content
def _load_package_data(package_paths: Iterable[str]):
schema = _load_schema()
result = {}
for provider_yaml_path in package_paths:
with open(provider_yaml_path) as yaml_file:
provider = yaml.load(yaml_file, SafeLoader)
rel_path = pathlib.Path(provider_yaml_path).relative_to(ROOT_DIR).as_posix()
try:
jsonschema.validate(provider, schema=schema)
except jsonschema.ValidationError:
raise Exception(f"Unable to parse: {rel_path}.")
if not provider.get("suspended"):
result[rel_path] = provider
else:
suspended_providers.add(provider["package-name"])
for integration in provider["integrations"]:
suspended_integrations.add(integration["integration-name"])
if "logo" in integration:
suspended_logos.add(integration["logo"])
return result
def get_all_integration_names(yaml_files) -> list[str]:
all_integrations = [
i["integration-name"] for f in yaml_files.values() if "integrations" in f for i in f["integrations"]
]
all_integrations += ["Local"]
return all_integrations
def check_integration_duplicates(yaml_files: dict[str, dict]):
"""Integration names must be globally unique."""
print("Checking integration duplicates")
all_integrations = get_all_integration_names(yaml_files)
duplicates = [(k, v) for (k, v) in Counter(all_integrations).items() if v > 1]
if duplicates:
print(
"Duplicate integration names found. Integration names must be globally unique. "
"Please delete duplicates."
)
print(tabulate(duplicates, headers=["Integration name", "Number of occurrences"]))
sys.exit(3)
def assert_sets_equal(set1, set2, allow_extra_in_set2=False):
try:
difference1 = set1.difference(set2)
except TypeError as e:
raise AssertionError(f"invalid type when attempting set difference: {e}")
except AttributeError as e:
raise AssertionError(f"first argument does not support set difference: {e}")
try:
difference2 = set2.difference(set1)
except TypeError as e:
raise AssertionError(f"invalid type when attempting set difference: {e}")
except AttributeError as e:
raise AssertionError(f"second argument does not support set difference: {e}")
if difference1 or (difference2 and not allow_extra_in_set2):
lines = []
if difference1:
lines.append(" -- Items in the left set but not the right:")
for item in sorted(difference1):
lines.append(f" {item!r}")
if difference2 and not allow_extra_in_set2:
lines.append(" -- Items in the right set but not the left:")
for item in sorted(difference2):
lines.append(f" {item!r}")
standard_msg = "\n".join(lines)
raise AssertionError(standard_msg)
class ObjectType(Enum):
MODULE = "module"
CLASS = "class"
def check_if_object_exist(object_name: str, resource_type: str, yaml_file_path: str, object_type: ObjectType):
try:
if object_type == ObjectType.CLASS:
module_name, object_name = object_name.rsplit(".", maxsplit=1)
the_class = getattr(importlib.import_module(module_name), object_name)
if the_class and inspect.isclass(the_class):
return
elif object_type == ObjectType.MODULE:
module = importlib.import_module(object_name)
if inspect.ismodule(module):
return
else:
raise RuntimeError(f"Wrong enum {object_type}???")
except Exception as e:
errors.append(
f"The `{object_name}` object in {resource_type} list in {yaml_file_path} does not exist "
f"or is not a {object_type.value}: {e}"
)
else:
errors.append(
f"The `{object_name}` object in {resource_type} list in {yaml_file_path} does not exist "
f"or is not a {object_type.value}."
)
def check_if_objects_exist_and_belong_to_package(
object_names: set[str],
provider_package: str,
yaml_file_path: str,
resource_type: str,
object_type: ObjectType,
):
for object_name in object_names:
if os.environ.get("VERBOSE"):
console.print(
f"[bright_blue]Checking if {object_name} of {resource_type} "
f"in {yaml_file_path} is {object_type.value} and belongs to {provider_package} package"
)
if not object_name.startswith(provider_package):
errors.append(
f"The `{object_name}` object in {resource_type} list in {yaml_file_path} does not start"
f" with the expected {provider_package}."
)
check_if_object_exist(object_name, resource_type, yaml_file_path, object_type)
def parse_module_data(provider_data, resource_type, yaml_file_path):
package_dir = ROOT_DIR.joinpath(yaml_file_path).parent
provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".")
py_files = chain(
package_dir.glob(f"**/{resource_type}/*.py"),
package_dir.glob(f"{resource_type}/*.py"),
package_dir.glob(f"**/{resource_type}/**/*.py"),
package_dir.glob(f"{resource_type}/**/*.py"),
)
expected_modules = {_filepath_to_module(f) for f in py_files if f.name != "__init__.py"}
resource_data = provider_data.get(resource_type, [])
return expected_modules, provider_package, resource_data
def check_correctness_of_list_of_sensors_operators_hook_modules(yaml_files: dict[str, dict]):
print("Checking completeness of list of {sensors, hooks, operators}")
print(" -- {sensors, hooks, operators} - Expected modules (left) : Current modules (right)")
for (yaml_file_path, provider_data), resource_type in product(
yaml_files.items(), ["sensors", "operators", "hooks"]
):
expected_modules, provider_package, resource_data = parse_module_data(
provider_data, resource_type, yaml_file_path
)
expected_modules = {module for module in expected_modules if module not in EXCLUDED_MODULES}
current_modules = {str(i) for r in resource_data for i in r.get("python-modules", [])}
check_if_objects_exist_and_belong_to_package(
current_modules, provider_package, yaml_file_path, resource_type, ObjectType.MODULE
)
try:
assert_sets_equal(set(expected_modules), set(current_modules))
except AssertionError as ex:
nested_error = textwrap.indent(str(ex), " ")
errors.append(
f"Incorrect content of key '{resource_type}/python-modules' "
f"in file: {yaml_file_path}\n{nested_error}"
)
def check_duplicates_in_integrations_names_of_hooks_sensors_operators(yaml_files: dict[str, dict]):
print("Checking for duplicates in list of {sensors, hooks, operators}")
for (yaml_file_path, provider_data), resource_type in product(
yaml_files.items(), ["sensors", "operators", "hooks"]
):
resource_data = provider_data.get(resource_type, [])
current_integrations = [r.get("integration-name", "") for r in resource_data]
if len(current_integrations) != len(set(current_integrations)):
for integration in current_integrations:
if current_integrations.count(integration) > 1:
errors.append(
f"Duplicated content of '{resource_type}/integration-name/{integration}' "
f"in file: {yaml_file_path}"
)
def check_completeness_of_list_of_transfers(yaml_files: dict[str, dict]):
print("Checking completeness of list of transfers")
resource_type = "transfers"
print(" -- Expected transfers modules(Left): Current transfers Modules(Right)")
for yaml_file_path, provider_data in yaml_files.items():
expected_modules, provider_package, resource_data = parse_module_data(
provider_data, resource_type, yaml_file_path
)
expected_modules = {module for module in expected_modules if module not in EXCLUDED_MODULES}
current_modules = {r.get("python-module") for r in resource_data}
check_if_objects_exist_and_belong_to_package(
current_modules, provider_package, yaml_file_path, resource_type, ObjectType.MODULE
)
try:
assert_sets_equal(set(expected_modules), set(current_modules))
except AssertionError as ex:
nested_error = textwrap.indent(str(ex), " ")
errors.append(
f"Incorrect content of key '{resource_type}/python-module' "
f"in file: {yaml_file_path}\n{nested_error}"
)
def check_hook_classes(yaml_files: dict[str, dict]):
print("Checking connection classes belong to package, exist and are classes")
resource_type = "hook-class-names"
for yaml_file_path, provider_data in yaml_files.items():
provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".")
hook_class_names = provider_data.get(resource_type)
if hook_class_names:
check_if_objects_exist_and_belong_to_package(
hook_class_names, provider_package, yaml_file_path, resource_type, ObjectType.CLASS
)
def check_trigger_classes(yaml_files: dict[str, dict]):
print("Checking triggers classes belong to package, exist and are classes")
resource_type = "triggers"
for yaml_file_path, provider_data in yaml_files.items():
provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".")
trigger_classes = {
name
for trigger_class in provider_data.get(resource_type, {})
for name in trigger_class["class-names"]
}
if trigger_classes:
check_if_objects_exist_and_belong_to_package(
trigger_classes, provider_package, yaml_file_path, resource_type, ObjectType.CLASS
)
def check_plugin_classes(yaml_files: dict[str, dict]):
print("Checking plugin classes belong to package, exist and are classes")
resource_type = "plugins"
for yaml_file_path, provider_data in yaml_files.items():
provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".")
plugins = provider_data.get(resource_type)
if plugins:
check_if_objects_exist_and_belong_to_package(
{plugin["plugin-class"] for plugin in plugins},
provider_package,
yaml_file_path,
resource_type,
ObjectType.CLASS,
)
def check_extra_link_classes(yaml_files: dict[str, dict]):
print("Checking extra-links belong to package, exist and are classes")
resource_type = "extra-links"
for yaml_file_path, provider_data in yaml_files.items():
provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".")
extra_links = provider_data.get(resource_type)
if extra_links:
check_if_objects_exist_and_belong_to_package(
extra_links, provider_package, yaml_file_path, resource_type, ObjectType.CLASS
)
def check_duplicates_in_list_of_transfers(yaml_files: dict[str, dict]):
print("Checking for duplicates in list of transfers")
errors = []
resource_type = "transfers"
for yaml_file_path, provider_data in yaml_files.items():
resource_data = provider_data.get(resource_type, [])
source_target_integrations = [
(r.get("source-integration-name", ""), r.get("target-integration-name", ""))
for r in resource_data
]
if len(source_target_integrations) != len(set(source_target_integrations)):
for integration_couple in source_target_integrations:
if source_target_integrations.count(integration_couple) > 1:
errors.append(
f"Duplicated content of \n"
f" '{resource_type}/source-integration-name/{integration_couple[0]}' "
f" '{resource_type}/target-integration-name/{integration_couple[1]}' "
f"in file: {yaml_file_path}"
)
def check_invalid_integration(yaml_files: dict[str, dict]):
print("Detect unregistered integrations")
all_integration_names = set(get_all_integration_names(yaml_files))
for (yaml_file_path, provider_data), resource_type in product(
yaml_files.items(), ["sensors", "operators", "hooks"]
):
resource_data = provider_data.get(resource_type, [])
current_names = {r["integration-name"] for r in resource_data}
invalid_names = current_names - all_integration_names
if invalid_names:
errors.append(
f"Incorrect content of key '{resource_type}/integration-name' in file: {yaml_file_path}. "
f"Invalid values: {invalid_names}"
)
for (yaml_file_path, provider_data), key in product(
yaml_files.items(), ["source-integration-name", "target-integration-name"]
):
resource_data = provider_data.get("transfers", [])
current_names = {r[key] for r in resource_data}
invalid_names = current_names - all_integration_names - suspended_integrations
if invalid_names:
errors.append(
f"Incorrect content of key 'transfers/{key}' in file: {yaml_file_path}. "
f"Invalid values: {invalid_names}"
)
def check_doc_files(yaml_files: dict[str, dict]):
print("Checking doc files")
current_doc_urls: list[str] = []
current_logo_urls: list[str] = []
for provider in yaml_files.values():
if "integrations" in provider:
current_doc_urls.extend(
guide
for guides in provider["integrations"]
if "how-to-guide" in guides
for guide in guides["how-to-guide"]
)
current_logo_urls.extend(
integration["logo"] for integration in provider["integrations"] if "logo" in integration
)
if "transfers" in provider:
current_doc_urls.extend(
op["how-to-guide"] for op in provider["transfers"] if "how-to-guide" in op
)
console.print("[yellow]Suspended providers:[/]")
console.print(suspended_providers)
expected_doc_files = chain(
DOCS_DIR.glob("apache-airflow-providers-*/operators/**/*.rst"),
DOCS_DIR.glob("apache-airflow-providers-*/transfer/**/*.rst"),
)
expected_doc_urls = {
f"/docs/{f.relative_to(DOCS_DIR).as_posix()}"
for f in expected_doc_files
if f.name != "index.rst"
and "_partials" not in f.parts
and not any(f.relative_to(DOCS_DIR).as_posix().startswith(s) for s in suspended_providers)
} | {
f"/docs/{f.relative_to(DOCS_DIR).as_posix()}"
for f in DOCS_DIR.glob("apache-airflow-providers-*/operators.rst")
if not any(f.relative_to(DOCS_DIR).as_posix().startswith(s) for s in suspended_providers)
}
console.print("[yellow]Suspended logos:[/]")
console.print(suspended_logos)
expected_logo_urls = {
f"/{f.relative_to(DOCS_DIR).as_posix()}"
for f in DOCS_DIR.glob("integration-logos/**/*")
if f.is_file()
and not any(f"/{f.relative_to(DOCS_DIR).as_posix()}".startswith(s) for s in suspended_logos)
}
try:
print(" -- Checking document urls: expected (left), current (right)")
assert_sets_equal(set(expected_doc_urls), set(current_doc_urls))
print(" -- Checking logo urls: expected (left), current (right)")
assert_sets_equal(set(expected_logo_urls), set(current_logo_urls))
except AssertionError as ex:
print(ex)
sys.exit(1)
def check_unique_provider_name(yaml_files: dict[str, dict]):
provider_names = [d["name"] for d in yaml_files.values()]
duplicates = {x for x in provider_names if provider_names.count(x) > 1}
if duplicates:
errors.append(f"Provider name must be unique. Duplicates: {duplicates}")
def check_providers_are_mentioned_in_issue_template(yaml_files: dict[str, dict]):
print("Checking providers are mentioned in issue template")
prefix_len = len("apache-airflow-providers-")
short_provider_names = [d["package-name"][prefix_len:] for d in yaml_files.values()]
# exclude deprecated provider that shouldn't be in issue template
deprecated_providers: list[str] = []
for item in deprecated_providers:
short_provider_names.remove(item)
jsonpath_expr = parse('$.body[?(@.attributes.label == "Apache Airflow Provider(s)")]..options[*]')
with PROVIDER_ISSUE_TEMPLATE_PATH.open() as issue_file:
issue_template = yaml.safe_load(issue_file)
all_mentioned_providers = [match.value for match in jsonpath_expr.find(issue_template)]
try:
print(
f" -- Checking providers: present in code (left), "
f"mentioned in {PROVIDER_ISSUE_TEMPLATE_PATH} (right)"
)
# in case of suspended providers, we still want to have them in the issue template
assert_sets_equal(set(short_provider_names), set(all_mentioned_providers), allow_extra_in_set2=True)
except AssertionError as ex:
print(ex)
sys.exit(1)
def check_providers_have_all_documentation_files(yaml_files: dict[str, dict]):
expected_files = ["commits.rst", "index.rst", "installing-providers-from-sources.rst"]
for package_info in yaml_files.values():
package_name = package_info["package-name"]
provider_dir = DOCS_DIR.joinpath(package_name)
for file in expected_files:
if not provider_dir.joinpath(file).is_file():
errors.append(
f"The provider {package_name} misses `{file}` in documentation. "
f"Please add the file to {provider_dir}"
)
if __name__ == "__main__":
architecture = Architecture.get_current()
console.print(f"Verifying packages on {architecture} architecture. Platform: {platform.machine()}.")
provider_files_pattern = pathlib.Path(ROOT_DIR).glob("airflow/providers/**/provider.yaml")
all_provider_files = sorted(str(path) for path in provider_files_pattern)
if len(sys.argv) > 1:
paths = [os.fspath(ROOT_DIR / f) for f in sorted(sys.argv[1:])]
else:
paths = all_provider_files
all_parsed_yaml_files: dict[str, dict] = _load_package_data(paths)
all_files_loaded = len(all_provider_files) == len(paths)
check_integration_duplicates(all_parsed_yaml_files)
check_duplicates_in_integrations_names_of_hooks_sensors_operators(all_parsed_yaml_files)
check_completeness_of_list_of_transfers(all_parsed_yaml_files)
check_duplicates_in_list_of_transfers(all_parsed_yaml_files)
check_hook_classes(all_parsed_yaml_files)
check_plugin_classes(all_parsed_yaml_files)
check_extra_link_classes(all_parsed_yaml_files)
check_trigger_classes(all_parsed_yaml_files)
check_correctness_of_list_of_sensors_operators_hook_modules(all_parsed_yaml_files)
check_unique_provider_name(all_parsed_yaml_files)
check_providers_have_all_documentation_files(all_parsed_yaml_files)
if all_files_loaded:
# Only check those if all provider files are loaded
check_doc_files(all_parsed_yaml_files)
check_invalid_integration(all_parsed_yaml_files)
check_providers_are_mentioned_in_issue_template(all_parsed_yaml_files)
if errors:
console.print(f"[red]Found {len(errors)} errors in providers[/]")
for error in errors:
console.print(f"[red]Error:[/] {error}")
sys.exit(1)