blob: 7f03de5c3284ca5ca89fcd52261cf323264919a2 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
import re
import sys
from datetime import datetime
from os.path import dirname, join, realpath
from typing import NamedTuple
from urllib.parse import urlsplit
import jinja2
from bs4 import BeautifulSoup
from github3 import login
from jinja2 import StrictUndefined
from tabulate import tabulate
class TestResult(NamedTuple):
test_id: str
file: str
name: str
classname: str
line: str
result: bool
class TestHistory(NamedTuple):
test_id: str
name: str
url: str
states: list[bool]
comment: str
test_results = []
user = ""
repo = ""
issue_id = 0
num_runs = 10
url_pattern = re.compile(r"\[([^]]*)]\(([^)]*)\)")
status_map: dict[str, bool] = {
":heavy_check_mark:": True,
":x:": False,
}
reverse_status_map: dict[bool, str] = {status_map[key]: key for key in status_map.keys()}
def get_url(result: TestResult) -> str:
return (
f"[{result.name}](https://github.com/{user}/{repo}/blob/"
f"main/{result.file}?test_id={result.test_id}#L{result.line})"
)
def parse_state_history(history_string: str) -> list[bool]:
history_array = history_string.split(" ")
status_array: list[bool] = []
for value in history_array:
if value:
status_array.append(status_map[value])
return status_array
def parse_test_history(line: str) -> TestHistory | None:
values = line.split("|")
match_url = url_pattern.match(values[1].strip())
if match_url:
name = match_url.group(1)
url = match_url.group(0)
http_url = match_url.group(2)
parsed_url = urlsplit(http_url)
the_id = parsed_url[3].split("=")[1]
comment = values[5] if len(values) >= 6 else ""
try:
states = parse_state_history(values[3])
except Exception:
states = []
return TestHistory(
test_id=the_id,
name=name,
states=states,
url=url,
comment=comment,
)
return None
def parse_body(body: str) -> dict[str, TestHistory]:
parse = False
test_history_map: dict[str, TestHistory] = {}
for line in body.splitlines(keepends=False):
if line.startswith("|-"):
parse = True
continue
if parse:
if not line.startswith("|"):
break
try:
status = parse_test_history(line)
except Exception:
continue
if status:
test_history_map[status.test_id] = status
return test_history_map
def update_test_history(history: TestHistory, last_status: bool):
print(f"Adding status to test history: {history}, {last_status}")
return TestHistory(
test_id=history.test_id,
name=history.name,
url=history.url,
states=([last_status] + history.states)[0:num_runs],
comment=history.comment,
)
def create_test_history(result: TestResult) -> TestHistory:
print(f"Creating test history {result}")
return TestHistory(
test_id=result.test_id, name=result.name, url=get_url(result), states=[result.result], comment=""
)
def get_history_status(history: TestHistory):
if len(history.states) < num_runs:
if all(history.states):
return "So far, so good"
return "Flaky"
if all(history.states):
return "Stable"
if all(history.states[0 : num_runs - 1]):
return "Just one more"
if all(history.states[0 : int(num_runs / 2)]):
return "Almost there"
return "Flaky"
def get_table(history_map: dict[str, TestHistory]) -> str:
headers = ["Test", "Last run", f"Last {num_runs} runs", "Status", "Comment"]
the_table: list[list[str]] = []
for ordered_key in sorted(history_map.keys()):
history = history_map[ordered_key]
the_table.append(
[
history.url,
"Succeeded" if history.states[0] else "Failed",
" ".join(reverse_status_map[state] for state in history.states),
get_history_status(history),
history.comment,
]
)
return tabulate(the_table, headers, tablefmt="github")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Provide XML JUNIT FILE as first argument")
sys.exit(1)
with open(sys.argv[1]) as f:
text = f.read()
y = BeautifulSoup(text, "html.parser")
res = y.testsuites.testsuite.findAll("testcase")
for test in res:
print("Parsing: " + test["classname"] + "::" + test["name"])
if len(test.contents) > 0 and test.contents[0].name == "skipped":
print(f"skipping {test['name']}")
continue
test_results.append(
TestResult(
test_id=test["classname"] + "::" + test["name"],
file=test["file"],
line=test["line"],
name=test["name"],
classname=test["classname"],
result=len(test.contents) == 0,
)
)
token = os.environ.get("GITHUB_TOKEN")
print(f"Token: {token}")
github_repository = os.environ.get("GITHUB_REPOSITORY")
if not github_repository:
raise Exception("GitHub Repository must be defined!")
user, repo = github_repository.split("/")
print(f"User: {user}, Repo: {repo}")
issue_id = int(os.environ.get("ISSUE_ID", 0))
num_runs = int(os.environ.get("NUM_RUNS", 10))
if issue_id == 0:
raise Exception("You need to define ISSUE_ID as environment variable")
gh = login(token=token)
quarantined_issue = gh.issue(user, repo, issue_id)
print("-----")
print(quarantined_issue.body)
print("-----")
parsed_test_map = parse_body(quarantined_issue.body)
new_test_map: dict[str, TestHistory] = {}
for test_result in test_results:
previous_results = parsed_test_map.get(test_result.test_id)
if previous_results:
updated_results = update_test_history(previous_results, test_result.result)
new_test_map[previous_results.test_id] = updated_results
else:
new_history = create_test_history(test_result)
new_test_map[new_history.test_id] = new_history
table = get_table(new_test_map)
print()
print("Result:")
print()
print(table)
print()
with open(join(dirname(realpath(__file__)), "quarantine_issue_header.md")) as f:
header = jinja2.Template(f.read(), autoescape=True, undefined=StrictUndefined).render(
DATE_UTC_NOW=datetime.utcnow()
)
quarantined_issue.edit(
title=None, body=header + "\n\n" + str(table), state="open" if len(test_results) > 0 else "closed"
)