blob: bf5aef53ffadbd7ad44777480b3e8eda9c4c677d [file]
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# /// script
# requires-python = ">=3.10"
# ///
"""
Analyze E2E test results across multiple workflow runs to identify flaky tests.
Downloads e2e-test-report artifacts from recent workflow runs, aggregates
failure data, and produces a Slack-formatted report.
The script queries GitHub Actions for recent runs of the UI E2E test workflows,
downloads the test report artifacts, and analyzes failure patterns to identify
tests that are good candidates for marking with test.fixme.
Environment variables (required):
GITHUB_REPOSITORY - Owner/repo (e.g. apache/airflow)
GITHUB_TOKEN - GitHub token for API access
Environment variables (optional):
MAX_RUNS - Maximum number of workflow runs to analyze (default: 10)
WORKFLOW_NAME - Workflow file name to query (default: ci-amd-arm.yml)
BRANCH - Branch to filter runs (default: main)
OUTPUT_FILE - Path for the Slack message output (default: slack-message.json)
GITHUB_OUTPUT - Path to GitHub Actions output file
GITHUB_STEP_SUMMARY - Path to GitHub Actions step summary file
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
import tempfile
from collections import defaultdict
from pathlib import Path
BROWSERS = ["chromium", "firefox", "webkit"]
# A test failing in >= this fraction of runs is considered a flaky candidate
FLAKY_THRESHOLD = 0.3
def escape_slack_mrkdwn(text: str) -> str:
"""Escape special characters for Slack mrkdwn format."""
text = text.replace("&", "&")
text = text.replace("<", "&lt;")
text = text.replace(">", "&gt;")
return text
def gh_api(endpoint: str, **kwargs: str) -> str | None:
"""Call GitHub API via gh CLI."""
cmd = ["gh", "api", endpoint]
for key, value in kwargs.items():
cmd.extend(["-f", f"{key}={value}"])
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
if result.returncode != 0:
print(f"gh api error for {endpoint}: {result.stderr}", file=sys.stderr)
return None
return result.stdout.strip()
def get_recent_run_ids(repo: str, workflow: str, branch: str, max_runs: int) -> list[dict]:
"""Get recent workflow run IDs and metadata."""
output = gh_api(
f"repos/{repo}/actions/workflows/{workflow}/runs",
branch=branch,
per_page=str(max_runs),
status="completed",
)
if not output:
return []
try:
data = json.loads(output)
except json.JSONDecodeError:
return []
runs = []
for run in data.get("workflow_runs", []):
runs.append(
{
"id": run["id"],
"run_number": run.get("run_number", "?"),
"created_at": run.get("created_at", ""),
"conclusion": run.get("conclusion", "unknown"),
"html_url": run.get("html_url", ""),
}
)
return runs[:max_runs]
def download_artifact(repo: str, run_id: int, artifact_name: str, dest_dir: Path) -> bool:
"""Download a specific artifact from a workflow run."""
result = subprocess.run(
[
"gh",
"run",
"download",
str(run_id),
"--name",
artifact_name,
"--dir",
str(dest_dir),
"--repo",
repo,
],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
# Artifact may not exist for this run (e.g. tests were skipped)
return False
return True
def load_json_safe(path: Path) -> dict | None:
"""Load JSON file, returning None on failure."""
if not path.exists():
return None
try:
return json.loads(path.read_text())
except json.JSONDecodeError:
return None
def analyze_runs(repo: str, runs: list[dict]) -> tuple[dict[str, dict], dict[str, dict], int]:
"""Download and analyze test reports from multiple runs.
Returns:
- failures_by_test: {test_name: {count, browsers, errors, runs}}
- fixme_tests: {test_name: {spec_file, annotations}}
- runs_with_data: number of runs that had downloadable data
"""
failures_by_test: dict[str, dict] = defaultdict(
lambda: {"count": 0, "browsers": set(), "errors": [], "run_ids": []}
)
fixme_tests: dict[str, dict] = {}
runs_with_data = 0
with tempfile.TemporaryDirectory() as tmpdir:
for run in runs:
run_id = run["id"]
run_has_data = False
for browser in BROWSERS:
artifact_name = f"e2e-test-report-{browser}"
dest = Path(tmpdir) / str(run_id) / browser
dest.mkdir(parents=True, exist_ok=True)
if not download_artifact(repo, run_id, artifact_name, dest):
continue
run_has_data = True
# Process failures
failures_data = load_json_safe(dest / "failures.json")
if failures_data:
for failure in failures_data.get("failures", []):
test_name = failure.get("test", "unknown")
entry = failures_by_test[test_name]
entry["count"] += 1
entry["browsers"].add(browser)
if failure.get("error") and len(entry["errors"]) < 3:
entry["errors"].append(
{
"error": failure["error"],
"browser": browser,
"run_id": run_id,
}
)
if run_id not in entry["run_ids"]:
entry["run_ids"].append(run_id)
# Process fixme tests (same across browsers, but collect once)
fixme_data = load_json_safe(dest / "fixme_tests.json")
if fixme_data:
for fixme in fixme_data.get("fixme_tests", []):
test_name = fixme.get("test", "unknown")
if test_name not in fixme_tests:
fixme_tests[test_name] = {
"spec_file": fixme.get("spec_file", "unknown"),
"annotations": fixme.get("annotations", []),
}
if run_has_data:
runs_with_data += 1
# Convert sets to sorted lists for JSON serialization
for entry in failures_by_test.values():
entry["browsers"] = sorted(entry["browsers"])
return dict(failures_by_test), fixme_tests, runs_with_data
def identify_flaky_candidates(failures_by_test: dict[str, dict], runs_with_data: int) -> list[dict]:
"""Identify tests that are good candidates for marking as fixme."""
if runs_with_data == 0:
return []
candidates = []
for test_name, data in failures_by_test.items():
# Use runs where this test appeared rather than total runs * browsers,
# since not every test runs in every browser or every run.
runs_seen = len(data["run_ids"]) if data["run_ids"] else 1
failure_rate = data["count"] / (runs_seen * len(data["browsers"])) if data["browsers"] else 0
if failure_rate >= FLAKY_THRESHOLD:
candidates.append(
{
"test": test_name,
"failure_rate": round(failure_rate * 100, 1),
"failure_count": data["count"],
"browsers": data["browsers"],
"errors": data["errors"][:2],
}
)
# Sort by failure rate descending
candidates.sort(key=lambda c: c["failure_rate"], reverse=True)
return candidates
def format_slack_message(
failures_by_test: dict[str, dict],
fixme_tests: dict[str, dict],
flaky_candidates: list[dict],
runs: list[dict],
runs_with_data: int,
repo: str,
) -> dict:
"""Format the analysis results as a Slack Block Kit message."""
total_runs = len(runs)
total_unique_failures = len(failures_by_test)
total_fixme = len(fixme_tests)
blocks: list[dict] = []
# Header
blocks.append(
{
"type": "header",
"text": {
"type": "plain_text",
"text": "UI E2E Flaky Test Report",
},
}
)
# Summary section
summary_lines = [
f"*Runs analyzed:* {runs_with_data}/{total_runs}",
f"*Unique test failures:* {total_unique_failures}",
f"*Flaky candidates (>={int(FLAKY_THRESHOLD * 100)}% failure rate):* {len(flaky_candidates)}",
f"*Tests marked fixme:* {total_fixme}",
]
blocks.append(
{
"type": "section",
"text": {"type": "mrkdwn", "text": "\n".join(summary_lines)},
}
)
blocks.append({"type": "divider"})
# Flaky candidates section
if flaky_candidates:
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Flaky Test Candidates* (consider marking with `test.fixme`):",
},
}
)
for candidate in flaky_candidates[:10]:
error_summary = ""
if candidate["errors"]:
first_error = escape_slack_mrkdwn(candidate["errors"][0]["error"])
# Truncate for Slack display
if len(first_error) > 150:
first_error = first_error[:150] + "..."
error_summary = f"\n> _Error: {first_error}_"
test_name = escape_slack_mrkdwn(candidate["test"])
browsers_str = ", ".join(candidate["browsers"])
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (
f"*{test_name}*\n"
f" Failure rate: *{candidate['failure_rate']}%* "
f"({candidate['failure_count']} failures) | "
f"Browsers: {browsers_str}"
f"{error_summary}"
),
},
}
)
if len(flaky_candidates) > 10:
blocks.append(
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"_...and {len(flaky_candidates) - 10} more flaky candidates_",
}
],
}
)
else:
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*No flaky test candidates detected* — all tests are stable or already marked.",
},
}
)
blocks.append({"type": "divider"})
# All failures table
if failures_by_test:
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*All Test Failures* (across last runs):",
},
}
)
# Sort by count descending
sorted_failures = sorted(failures_by_test.items(), key=lambda x: x[1]["count"], reverse=True)
failure_lines = []
for test_name, data in sorted_failures[:15]:
escaped_name = escape_slack_mrkdwn(test_name)
browsers_str = ", ".join(data["browsers"])
error_hint = ""
if data["errors"]:
short_err = escape_slack_mrkdwn(data["errors"][0]["error"])
if len(short_err) > 100:
short_err = short_err[:100] + "..."
error_hint = f"\n> _{short_err}_"
failure_lines.append(
f"• *{escaped_name}* — {data['count']}x failures | {browsers_str}{error_hint}"
)
# Slack has a 3000 char limit per text block; split if needed
text = "\n".join(failure_lines)
if len(text) > 2900:
text = text[:2900] + "\n_...truncated_"
blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": text}})
if len(sorted_failures) > 15:
blocks.append(
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"_...and {len(sorted_failures) - 15} more failing tests_",
}
],
}
)
else:
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*No test failures detected* in the analyzed runs.",
},
}
)
blocks.append({"type": "divider"})
# Fixme tests section
if fixme_tests:
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Tests Marked `fixme`* (waiting for fixes):",
},
}
)
fixme_lines = []
for test_name, data in sorted(fixme_tests.items()):
escaped_name = escape_slack_mrkdwn(test_name)
spec = escape_slack_mrkdwn(data.get("spec_file", "unknown"))
fixme_lines.append(f"• {escaped_name} (`{spec}`)")
text = "\n".join(fixme_lines)
if len(text) > 2900:
text = text[:2900] + "\n_...truncated_"
blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": text}})
else:
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*No tests are currently marked `fixme`.*",
},
}
)
# Footer with link
if runs:
latest_run = runs[0]
blocks.append(
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": (
f"<https://github.com/{repo}/actions|View all workflow runs> | "
f"Latest: <{latest_run['html_url']}|Run #{latest_run['run_number']}>"
),
}
],
}
)
# Build the plain text fallback
fallback = (
f"UI E2E Flaky Test Report: "
f"{total_unique_failures} unique failures, "
f"{len(flaky_candidates)} flaky candidates, "
f"{total_fixme} fixme tests "
f"(from {runs_with_data} runs)"
)
return {
"channel": "airflow-3-ui",
"text": fallback,
"blocks": blocks,
}
def write_step_summary(
failures_by_test: dict[str, dict],
fixme_tests: dict[str, dict],
flaky_candidates: list[dict],
runs_with_data: int,
total_runs: int,
) -> None:
"""Write a GitHub Actions step summary in markdown."""
summary_path = os.environ.get("GITHUB_STEP_SUMMARY")
if not summary_path:
return
lines = [
"## UI E2E Flaky Test Report",
"",
"| Metric | Value |",
"|--------|-------|",
f"| Runs analyzed | {runs_with_data}/{total_runs} |",
f"| Unique test failures | {len(failures_by_test)} |",
f"| Flaky candidates | {len(flaky_candidates)} |",
f"| Tests marked fixme | {len(fixme_tests)} |",
"",
]
if flaky_candidates:
lines.append("### Flaky Candidates")
lines.append("")
lines.append("| Test | Failure Rate | Count | Browsers |")
lines.append("|------|-------------|-------|----------|")
for c in flaky_candidates[:20]:
browsers = ", ".join(c["browsers"])
lines.append(f"| {c['test']} | {c['failure_rate']}% | {c['failure_count']} | {browsers} |")
lines.append("")
if failures_by_test:
lines.append("### All Failures")
lines.append("")
lines.append("| Test | Failures | Browsers | Error |")
lines.append("|------|----------|----------|-------|")
sorted_failures = sorted(failures_by_test.items(), key=lambda x: x[1]["count"], reverse=True)
for test_name, data in sorted_failures[:20]:
browsers = ", ".join(data["browsers"])
error = data["errors"][0]["error"][:100] + "..." if data["errors"] else ""
error = error.replace("|", "\\|")
lines.append(f"| {test_name} | {data['count']} | {browsers} | {error} |")
lines.append("")
if fixme_tests:
lines.append("### Fixme Tests")
lines.append("")
for test_name, data in sorted(fixme_tests.items()):
lines.append(f"- {test_name} (`{data.get('spec_file', 'unknown')}`)")
lines.append("")
with open(summary_path, "a") as f:
f.write("\n".join(lines))
def main() -> None:
repo = os.environ.get("GITHUB_REPOSITORY", "apache/airflow")
max_runs = int(os.environ.get("MAX_RUNS", "10"))
workflow = os.environ.get("WORKFLOW_NAME", "ci-amd-arm.yml")
branch = os.environ.get("BRANCH", "main")
output_file = Path(os.environ.get("OUTPUT_FILE", "slack-message.json"))
print(f"Analyzing E2E test results for {repo} ({workflow} on {branch})")
print(f"Looking at up to {max_runs} recent runs...")
# Get recent runs
runs = get_recent_run_ids(repo, workflow, branch, max_runs)
if not runs:
print("No workflow runs found.")
sys.exit(0)
print(f"Found {len(runs)} runs to analyze.")
# Download and analyze
failures_by_test, fixme_tests, runs_with_data = analyze_runs(repo, runs)
print(f"Runs with data: {runs_with_data}/{len(runs)}")
print(f"Unique failing tests: {len(failures_by_test)}")
print(f"Fixme tests: {len(fixme_tests)}")
# Identify flaky candidates
flaky_candidates = identify_flaky_candidates(failures_by_test, runs_with_data)
print(f"Flaky candidates: {len(flaky_candidates)}")
# Format Slack message
slack_message = format_slack_message(
failures_by_test, fixme_tests, flaky_candidates, runs, runs_with_data, repo
)
output_file.write_text(json.dumps(slack_message, indent=2))
print(f"Slack message written to: {output_file}")
# Write step summary
write_step_summary(failures_by_test, fixme_tests, flaky_candidates, runs_with_data, len(runs))
# Set outputs for GitHub Actions
github_output = os.environ.get("GITHUB_OUTPUT")
if github_output:
has_failures = len(failures_by_test) > 0
has_flaky = len(flaky_candidates) > 0
with open(github_output, "a") as f:
f.write(f"has-failures={str(has_failures).lower()}\n")
f.write(f"has-flaky-candidates={str(has_flaky).lower()}\n")
f.write(f"unique-failures={len(failures_by_test)}\n")
f.write(f"flaky-candidates={len(flaky_candidates)}\n")
f.write(f"fixme-tests={len(fixme_tests)}\n")
f.write(f"runs-analyzed={runs_with_data}\n")
if __name__ == "__main__":
main()