blob: 5b987b0794dfa5ae2804d44a04530702f96e4ab9 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# /// script
# requires-python = ">=3.10,<3.11"
# dependencies = [
# "rich>=13.6.0",
# ]
# ///
from __future__ import annotations
import re
import sys
from pathlib import Path
from rich.console import Console
if __name__ not in ("__main__", "__mp_main__"):
raise SystemExit(
"This file is intended to be executed as an executable program. You cannot use it as a module."
f"To execute this script, run ./{__file__} [FILE] ..."
)
console = Console(color_system="standard", width=200)
errors: list[str] = []
WATCHER_APPEND_INSTRUCTION = "list(dag.tasks) >> watcher()"
WATCHER_APPEND_INSTRUCTION_SHORT = " >> watcher()"
PYTEST_FUNCTION = """
from tests_common.test_utils.system_tests import get_test_run # noqa: E402
# Needed to run the example DAG with pytest (see: contributing-docs/testing/system_tests.rst)
test_run = get_test_run(dag)
"""
PYTEST_FUNCTION_PATTERN = re.compile(
r"from tests_common\.test_utils\.system_tests import get_test_run(?: # noqa: E402)?\s+"
r"(?:# .+\))?\s+"
r"test_run = get_test_run"
)
DAG_WITH_ARGS = re.compile(
r"with\s+DAG\s*\(\s*(?P<args>.*?)\)\s*as\s+\w+\s*:",
re.DOTALL,
)
REQUIRES_SCHEDULE_ERROR = (
"[red]System test DAG must set the 'schedule' parameter (e.g. schedule='@once').[/]\n\n"
"[yellow]Example:[/]\n\n"
"with DAG(\n"
" DAG_ID,\n"
" schedule='@once',\n"
" start_date=datetime(2021, 1, 1),\n"
" catchup=False,\n"
" tags=[...],\n"
") as dag:\n"
)
def _check_file(file: Path):
content = file.read_text()
console.print("----------file path: ", file)
for m in DAG_WITH_ARGS.finditer(content):
args = m.group("args")
if "schedule" not in args:
errors.append(
f"In {file}: System test DAG must include the 'schedule' parameter "
f"(e.g. schedule='@once') inside DAG(...).\n"
)
elif "providers/google/" in str(file).replace("\\", "/") and "schedule=None" in args:
errors.append(
f"In {file}: System test DAG should not include the 'schedule' parameter "
f"with value 'None' inside DAG(...): this configuration will make the automated run not possible for the test.\n"
)
if "from tests_common.test_utils.watcher import watcher" in content:
index = content.find(WATCHER_APPEND_INSTRUCTION_SHORT)
if index == -1:
errors.append(
f"[red]The example {file} imports tests_common.test_utils.watcher "
f"but does not use it properly![/]\n\n"
"[yellow]Make sure you have:[/]\n\n"
f" {WATCHER_APPEND_INSTRUCTION_SHORT}\n\n"
"[yellow]as the last instruction in your example DAG.[/]\n"
)
else:
operator_leftshift_index = content.find("<<", index + len(WATCHER_APPEND_INSTRUCTION))
operator_rightshift_index = content.find(">>", index + len(WATCHER_APPEND_INSTRUCTION))
if operator_leftshift_index != -1 or operator_rightshift_index != -1:
errors.append(
f"[red]In the example {file} "
f"watcher is not the last instruction in your DAG "
f"(there are << or >> operators after it)![/]\n\n"
"[yellow]Make sure you have:[/]\n"
f" {WATCHER_APPEND_INSTRUCTION}\n\n"
"[yellow]as the last instruction in your example DAG.[/]\n"
)
if not PYTEST_FUNCTION_PATTERN.search(content):
errors.append(
f"[yellow]The example {file} missed the pytest function at the end.[/]\n\n"
"All example tests should have this function added:\n\n" + PYTEST_FUNCTION + "\n\n"
"[yellow]Automatically adding it now![/]\n"
)
file.write_text(content + "\n" + PYTEST_FUNCTION)
if __name__ == "__main__":
for file in sys.argv[1:]:
_check_file(Path(file))
if errors:
console.print("[red]There were some errors in the example files[/]\n")
for error in errors:
console.print(error)
sys.exit(1)