| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import annotations |
| |
| import json |
| import os |
| from functools import cache |
| from pathlib import Path |
| from typing import Any |
| |
| import jsonschema |
| import yaml |
| |
| ROOT_DIR = Path(__file__).parents[2].resolve() |
| AIRFLOW_PROVIDERS_DIR = ROOT_DIR / "providers" |
| AIRFLOW_PROVIDERS_SRC = AIRFLOW_PROVIDERS_DIR / "src" |
| PROVIDER_DATA_SCHEMA_PATH = ROOT_DIR / "airflow" / "provider.yaml.schema.json" |
| |
| |
| @cache |
| def provider_yaml_schema() -> dict[str, Any]: |
| with open(PROVIDER_DATA_SCHEMA_PATH) as schema_file: |
| return json.load(schema_file) |
| |
| |
| def _provider_yaml_directory_to_module(provider_yaml_directory_path: str) -> str: |
| return str(Path(provider_yaml_directory_path).relative_to(AIRFLOW_PROVIDERS_SRC)).replace("/", ".") |
| |
| |
| def _get_provider_root_path(provider_yaml_directory_path: Path) -> Path: |
| for parent in Path(provider_yaml_directory_path).parents: |
| if (parent / "src").exists(): |
| return parent |
| raise ValueError( |
| f"The path {provider_yaml_directory_path} should " |
| f"be provider path under `providers/<PROVIDER>/src` folder.`" |
| ) |
| |
| |
| def _filepath_to_system_tests(provider_yaml_directory_path: Path) -> Path: |
| test_root_path = _get_provider_root_path(provider_yaml_directory_path) / "tests" |
| return (test_root_path / "system").relative_to(AIRFLOW_PROVIDERS_DIR) |
| |
| |
| @cache |
| def get_all_provider_yaml_paths() -> list[Path]: |
| """Returns list of all provider.yaml files including new and old structure.""" |
| return sorted(list(AIRFLOW_PROVIDERS_DIR.glob("**/provider.yaml"))) |
| |
| |
| @cache |
| def load_package_data(include_suspended: bool = False) -> list[dict[str, Any]]: |
| """ |
| Load all data from providers files |
| |
| :return: A list containing the contents of all provider.yaml files - old and new structure. |
| """ |
| schema = provider_yaml_schema() |
| result = [] |
| for provider_yaml_path in get_all_provider_yaml_paths(): |
| with open(provider_yaml_path) as yaml_file: |
| provider = yaml.safe_load(yaml_file) |
| try: |
| jsonschema.validate(provider, schema=schema) |
| except jsonschema.ValidationError as ex: |
| msg = f"Unable to parse: {provider_yaml_path}. Original error {type(ex).__name__}: {ex}" |
| raise RuntimeError(msg) |
| if provider["state"] == "suspended" and not include_suspended: |
| continue |
| provider_yaml_dir_str = os.path.dirname(provider_yaml_path) |
| module = provider["package-name"][len("apache-") :].replace("-", ".") |
| module_folder = module[len("airflow-providers-") :].replace(".", "/") |
| provider["python-module"] = module |
| provider["package-dir"] = f"{provider_yaml_dir_str}/src/{module.replace('.', '/')}" |
| provider["docs-dir"] = os.path.dirname(provider_yaml_path.parent / "docs") |
| provider["system-tests-dir"] = f"{provider_yaml_dir_str}/tests/system/{module_folder}" |
| result.append(provider) |
| return result |