blob: 20a0dd2fcbbf8db40ff7e9654410cf183080e88d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import json
import os
from datetime import datetime
from shutil import copyfile, copytree
import pytest
from rich.console import Console
from testcontainers.compose import DockerCompose
from airflow_e2e_tests.constants import (
AWS_INIT_PATH,
DOCKER_COMPOSE_HOST_PORT,
DOCKER_COMPOSE_PATH,
DOCKER_IMAGE,
E2E_DAGS_FOLDER,
E2E_TEST_MODE,
LOCALSTACK_PATH,
LOGS_FOLDER,
TEST_REPORT_FILE,
)
console = Console(width=400, color_system="standard")
compose_instance = None
airflow_logs_path = None
def _setup_s3_integration(dot_env_file, tmp_dir):
copyfile(LOCALSTACK_PATH, tmp_dir / "localstack.yml")
copyfile(AWS_INIT_PATH, tmp_dir / "init-aws.sh")
current_permissions = os.stat(tmp_dir / "init-aws.sh").st_mode
os.chmod(tmp_dir / "init-aws.sh", current_permissions | 0o111)
dot_env_file.write_text(
f"AIRFLOW_UID={os.getuid()}\n"
"AWS_DEFAULT_REGION=us-east-1\n"
"AWS_ENDPOINT_URL_S3=http://localstack:4566\n"
"AIRFLOW__LOGGING__REMOTE_LOGGING=true\n"
"AIRFLOW_CONN_AWS_S3_LOGS=aws://test:test@\n"
"AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID=aws_s3_logs\n"
"AIRFLOW__LOGGING__DELETE_LOCAL_LOGS=true\n"
"AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=s3://test-airflow-logs\n"
)
os.environ["ENV_FILE_PATH"] = str(dot_env_file)
def spin_up_airflow_environment(tmp_path_factory):
global compose_instance
global airflow_logs_path
tmp_dir = tmp_path_factory.mktemp("airflow-e2e-tests")
console.print(f"[yellow]Using docker compose file: {DOCKER_COMPOSE_PATH}")
copyfile(DOCKER_COMPOSE_PATH, tmp_dir / "docker-compose.yaml")
subfolders = ("dags", "logs", "plugins", "config")
console.print(f"[yellow]Creating subfolders:[/ {subfolders}")
for subdir in subfolders:
(tmp_dir / subdir).mkdir()
airflow_logs_path = tmp_dir / "logs"
console.print(f"[yellow]Copying dags to:[/ {tmp_dir / 'dags'}")
copytree(E2E_DAGS_FOLDER, tmp_dir / "dags", dirs_exist_ok=True)
dot_env_file = tmp_dir / ".env"
dot_env_file.write_text(f"AIRFLOW_UID={os.getuid()}\n")
console.print(f"[yellow]Creating .env file :[/ {dot_env_file}")
os.environ["AIRFLOW_IMAGE_NAME"] = DOCKER_IMAGE
compose_file_names = ["docker-compose.yaml"]
if E2E_TEST_MODE == "remote_log":
compose_file_names.append("localstack.yml")
_setup_s3_integration(dot_env_file, tmp_dir)
# If we are using the image from ghcr.io/apache/airflow/main we do not pull
# as it is already available and loaded using prepare_breeze_and_image step in workflow
pull = False if DOCKER_IMAGE.startswith("ghcr.io/apache/airflow/main/") else True
try:
console.print(f"[blue]Spinning up airflow environment using {DOCKER_IMAGE}")
compose_instance = DockerCompose(tmp_dir, compose_file_name=compose_file_names, pull=pull)
compose_instance.start()
compose_instance.wait_for(f"http://{DOCKER_COMPOSE_HOST_PORT}/api/v2/version")
compose_instance.exec_in_container(
command=["airflow", "dags", "reserialize"], service_name="airflow-dag-processor"
)
except Exception:
console.print("[red]Failed to start docker compose")
_print_logs(compose_instance)
compose_instance.stop()
raise
def _print_logs(compose_instance):
containers = compose_instance.get_containers()
for container in containers:
service = container.Service
stdout, _ = compose_instance.get_logs(service)
console.print(f"::group:: {service} Logs")
console.print(f"[red]{stdout}")
console.print("::endgroup::")
def pytest_sessionstart(session):
tmp_path_factory = session.config._tmp_path_factory
spin_up_airflow_environment(tmp_path_factory)
console.print("[green]Airflow environment is up and running!")
test_results = []
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""Capture test results."""
output = yield
report = output.get_result()
if report.when == "call":
test_result = {
"test_name": item.name,
"test_class": item.cls.__name__ if item.cls else "",
"status": report.outcome,
"duration": report.duration,
"error": str(report.longrepr) if report.failed else None,
"timestamp": datetime.now().isoformat(),
}
test_results.append(test_result)
def pytest_sessionfinish(session, exitstatus):
"""Generate report after all tests complete."""
generate_test_report(test_results)
if airflow_logs_path is not None:
copytree(airflow_logs_path, LOGS_FOLDER, dirs_exist_ok=True)
# If any test failures lets print the services logs
if any(r["status"] == "failed" for r in test_results):
_print_logs(compose_instance=compose_instance)
if compose_instance:
if not os.environ.get("SKIP_DOCKER_COMPOSE_DELETION"):
compose_instance.stop()
def generate_test_report(results):
"""Generate test report with json summary."""
report = {
"summary": {
"total_tests": len(results),
"passed": len([r for r in results if r["status"] == "passed"]),
"failed": len([r for r in results if r["status"] == "failed"]),
"execution_time": sum(r["duration"] for r in results),
},
"test_results": results,
}
with open(TEST_REPORT_FILE, "w") as f:
json.dump(report, f, indent=2)
console.print(f"[blue]\n{'=' * 50}")
console.print("[blue]TEST EXECUTION SUMMARY")
console.print(f"[blue]{'=' * 50}")
console.print(f"[blue]Total Tests: {report['summary']['total_tests']}")
console.print(f"[blue]Passed: {report['summary']['passed']}")
console.print(f"[red]Failed: {report['summary']['failed']}")
console.print(f"[blue]Execution Time: {report['summary']['execution_time']:.2f}s")
console.print("[blue]Reports generated: test_report.json")