blob: fb72d5f14da6eb2a82a394a769c79ee02c0b7ee2 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
import sys
from pathlib import Path
from typing import Tuple
import yaml
from rich.console import Console
if __name__ not in ("__main__", "__mp_main__"):
raise SystemExit(
"This file is intended to be executed as an executable program. You cannot use it as a module."
f"To run this script, run the ./{__file__} command [FILE] ..."
)
console = Console(color_system="standard", width=200)
AIRFLOW_SOURCES_ROOT = Path(__file__).parents[3].resolve()
EXAMPLE_DAGS_URL_MATCHER = re.compile(
r"^(.*)(https://github.com/apache/airflow/tree/(.*)/airflow/providers/(.*)/example_dags)(/?>.*)$"
)
SYSTEM_TESTS_URL_MATCHER = re.compile(
r"^(.*)(https://github.com/apache/airflow/tree/(.*)/tests/system/providers/(.*))(/?>.*)$"
)
def get_provider_and_version(url_path: str) -> Tuple[str, str]:
candidate_folders = url_path.split("/")
while candidate_folders:
try:
with open(
(AIRFLOW_SOURCES_ROOT / "airflow" / "providers").joinpath(*candidate_folders)
/ "provider.yaml"
) as f:
provider_info = yaml.safe_load(f)
version = provider_info["versions"][0]
provider = "-".join(candidate_folders)
while provider.endswith("-"):
provider = provider[:-1]
return provider, version
except FileNotFoundError:
candidate_folders = candidate_folders[:-1]
console.print(
f"[red]Bad example path: {url_path}. Missing "
f"provider.yaml in any of the 'airflow/providers/{url_path}' folders. [/]"
)
sys.exit(1)
def replace_match(file: str, line: str) -> str:
for matcher in [EXAMPLE_DAGS_URL_MATCHER, SYSTEM_TESTS_URL_MATCHER]:
match = matcher.match(line)
if match:
new_line = line
url_path_to_dir = match.group(4)
folders = url_path_to_dir.split("/")
example_dags_folder = (AIRFLOW_SOURCES_ROOT / "airflow" / "providers").joinpath(
*folders
) / "example_dags"
system_tests_folder = (AIRFLOW_SOURCES_ROOT / "tests" / "system" / "providers").joinpath(*folders)
provider, version = get_provider_and_version(url_path_to_dir)
if system_tests_folder.exists():
proper_system_tests_url = (
f"https://github.com/apache/airflow/tree/providers-{provider}/{version}"
f"/tests/system/providers/{url_path_to_dir}"
)
new_line = re.sub(matcher, r"\1" + proper_system_tests_url + r"\5", line)
elif example_dags_folder.exists():
proper_example_dags_url = (
f"https://github.com/apache/airflow/tree/providers-{provider}/{version}"
f"/airflow/providers/{url_path_to_dir}/example_dags"
)
new_line = re.sub(matcher, r"\1" + proper_example_dags_url + r"\5", line)
else:
console.print(
f"[red] Error - neither example dags nor system tests folder exists for {provider}[/]"
)
if line != new_line:
console.print(f'[yellow] Replacing in {file}[/]\n{line.strip()}\n{new_line.strip()}')
return new_line
return line
def find_matches(_file: Path):
lines = _file.read_text().splitlines(keepends=True)
for index, line in enumerate(lines):
lines[index] = replace_match(str(_file), line)
_file.write_text("".join(lines))
if __name__ == '__main__':
for file in sys.argv[1:]:
find_matches(Path(file))