blob: bee5512eec71f24a0c793f23baedae5894690f07 [file]
#!/usr/bin/env python3
##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Convert Apache Yetus console output to Markdown format.
"""
import os
import re
import sys
from io import TextIOWrapper
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Vote to emoji mapping
VOTE_EMOJI = {
'+1': '✅',
'-1': '❌',
'0': '🆗',
'+0': '🆗',
'-0': '⚠️'
}
def convert_vote(vote: str) -> str:
"""Convert vote string to emoji."""
return VOTE_EMOJI.get(vote, vote)
def is_runtime(text: str) -> bool:
"""Check if text is a runtime like '41m 24s'."""
return bool(re.match(r'^\d+m\s+\d+s$', text))
def parse_table_row(line: str) -> Tuple[str, str, str, str]:
"""
Parse a table row and return tuple of cell values.
Returns exactly 4 columns: (vote, subsystem, runtime, comment)
"""
parts = line.split('|')
# Remove first empty element (from leading |)
parts = parts[1:] if len(parts) > 1 else []
# Take first 4 columns and strip whitespace
result: List[str] = [p.strip() for p in parts[:4]]
# Pad to 4 columns if needed
while len(result) < 4:
result.append('')
return result[0], result[1], result[2], result[3]
def is_results_section_start(line: str) -> bool:
"""Check if line indicates the start of Results section."""
return bool(re.search(r'^\[\w+] Results:', line.strip()))
def is_tests_run_summary(line: str) -> bool:
"""Check if line is the Tests run summary line."""
return bool(re.search(r'^\[\w+] Tests run:', line.strip()))
def parse_results_section(
f: TextIOWrapper,
failures: List[str],
flakes: List[str],
errors: List[str]
) -> None:
"""
Parse the Results section within a patch-unit file.
"""
current_error_type = None
while line := f.readline():
stripped = line.strip()
# Section end markers
if is_tests_run_summary(line):
return
# Detect error type sections
if re.search(r'^\[\w+] Failures:', stripped):
current_error_type = failures
elif re.search(r'^\[\w+] Flakes:', stripped):
current_error_type = flakes
elif re.search(r'^\[\w+] Errors:', stripped):
current_error_type = errors
else:
# Parse test entries
if current_error_type is not None:
test_match = re.search(
r'^\[\w+]\s+((?:org\.)?\S+\.(?:\w+\.)*\w+\.\w+)',
stripped
)
if test_match:
test_name = test_match.group(1)
if 'test' in test_name.lower():
current_error_type.append(test_name)
def skip_to_results_section(f: TextIOWrapper) -> bool:
"""
Skip the io stream to the Results section.
After calling this method, the TextIOWrapper will locate at the next line of "Results: "
Returns:
True if we find a results section, False if we have reached the EOF
"""
while line := f.readline():
if is_results_section_start(line):
return True
return False
def scan_all_tests(dir: Path) -> Dict[str, str]:
"""
Scan the archiver dir to find all the tests and their module
Returns:
Dict mapping test name to module name
"""
module = None
module_to_test_name = {}
for dirpath, _, filenames in os.walk(dir):
if len(filenames) > 0:
# /archiver/<module>/target/surefire-reports
module = dirpath.split(os.sep)[-3]
for filename in filenames:
match = re.match(r'(org\.apache\.[^-]+)\.txt', filename)
if match:
module_to_test_name[match.group(1)] = module
return module_to_test_name
def parse_patch_unit_file(
file_path: Path,
failures: List[str],
flakes: List[str],
errors: List[str]
) -> None:
"""
Parse a patch-unit-*.txt file and extract failed tests by module.
"""
with open(file_path, 'r') as f:
while skip_to_results_section(f):
parse_results_section(f, failures, flakes, errors)
def get_module(test_name: str, test_name_to_module: Dict[str, str]) -> str:
rindex_of_bracket = test_name.rfind('[')
if rindex_of_bracket > 0:
# parameterized test, remove the tailing parameters
test_name = test_name[:rindex_of_bracket]
module = test_name_to_module.get(test_name)
if module:
return module
# usually the failed test name has the method name suffix, but the test_name_to_module only
# contains class name, so let's try to remove the last part and try again
rindex_of_dot = test_name.rfind('.')
if rindex_of_dot > 0:
test_name = test_name[:rindex_of_dot]
module = test_name_to_module.get(test_name)
if module:
return module
return 'default'
def increase(module_to_count: Dict[str, int], module: str) -> None:
if module in module_to_count:
module_to_count[module] += 1
else:
module_to_count[module] = 1
def add_to_details(test_name: str, module: str, error_type: str,
details: Dict[str, Dict[str, List[str]]]) -> None:
if module not in details:
error_type_to_tests = {}
details[module] = error_type_to_tests
else:
error_type_to_tests = details[module]
if error_type in error_type_to_tests:
error_type_to_tests[error_type].append(test_name)
else:
error_type_to_tests[error_type] = [test_name]
def process_failed_tests(
error_type: str,
failed_tests: List[str],
module_to_test_name: Dict[str, str],
counts: Dict[str, Dict[str, int]],
details: Dict[str, Dict[str, List[str]]]
) -> None:
for test_name in failed_tests:
module = get_module(test_name, module_to_test_name)
increase(counts[error_type], module)
add_to_details(test_name, module, error_type, details)
def aggregate_failed_tests(yetus_dir: Path) -> Tuple[
Dict[str, Dict[str, int]], Dict[str, Dict[str, List[str]]]]:
"""
Aggregate failed tests from all patch-unit-*.txt files.
Returns:
Tuple of:
- counts: {error_type: {module: count}}
- details: {module: {error_type: [test_names]}}
"""
patch_files = list(yetus_dir.glob('patch-unit-*.txt'))
if not patch_files:
return {}, {}
# Aggregate results from all files
failures = []
flakes = []
errors = []
for patch_file in patch_files:
parse_patch_unit_file(patch_file, failures, flakes, errors)
if not failures and not flakes and not errors:
return {}, {}
counts = {'Failures': {}, 'Flakes': {}, 'Errors': {}}
details = {}
module_to_test_name = scan_all_tests(yetus_dir / 'archiver')
process_failed_tests('Failures', failures, module_to_test_name, counts, details)
process_failed_tests('Flakes', flakes, module_to_test_name, counts, details)
process_failed_tests('Errors', errors, module_to_test_name, counts, details)
return dict(counts), dict(details)
def generate_failed_tests_table(
counts: Dict[str, Dict[str, int]],
details: Dict[str, Dict[str, List[str]]]
) -> List[str]:
"""Generate the Failed Tests HTML table."""
total_failures = sum(sum(m.values()) for m in counts.values())
if total_failures == 0:
return []
content = [
'\n## Failed Tests\n\n',
'<table>\n',
'<thead><tr><th>Error Type</th><th>Count</th><th>Module</th><th>Tests</th></tr></thead>\n',
'<tbody>\n'
]
error_types = ['Failures', 'Flakes', 'Errors']
for error_type in error_types:
if error_type not in counts:
continue
modules = counts[error_type]
total_count = sum(modules.values())
num_modules = len(modules)
first_row = True
for module in sorted(modules.keys()):
tests = details.get(module, {}).get(error_type, [])
tests_str = '<br>'.join(sorted(set(tests))) if tests else ''
if first_row:
content.append(
f'<tr><td rowspan="{num_modules}">{error_type}</td>'
f'<td rowspan="{num_modules}">{total_count}</td>'
f'<td>{module}</td><td>{tests_str}</td></tr>\n'
)
first_row = False
else:
content.append(f'<tr><td>{module}</td><td>{tests_str}</td></tr>\n')
content.extend(['</tbody>\n', '</table>\n'])
return content
def collect_continuation_lines(
lines: List[str],
start_idx: int
) -> Tuple[List[str], int]:
"""
Collect continuation lines for a table row.
Args:
lines: All lines from the file
start_idx: Index to start checking from
Returns:
Tuple of (list of comment parts, next index to process)
"""
comment_parts = []
i = start_idx
while i < len(lines):
line = lines[i]
stripped = line.strip()
if not stripped.startswith('|'):
break
if '|| Subsystem || Report/Notes ||' in line:
break
vote, _, runtime, comment = parse_table_row(line)
# Stop at new data row
if vote in VOTE_EMOJI:
break
# Empty vote/subsystem means continuation or separator
if not vote:
if comment:
comment_parts.append(comment)
i += 1
elif runtime and is_runtime(runtime):
break
else:
i += 1
else:
break
return comment_parts, i
def process_first_table(lines: List[str], start_idx: int) -> Tuple[List[str], int]:
"""
Process the first table (Vote, Subsystem, Runtime, Comment).
Returns:
Tuple of (Markdown lines, next index to process)
"""
content = [
'\n',
'| Vote | Subsystem | Runtime | Comment |\n',
'|------|-----------|---------|---------|\n'
]
i = start_idx
# Skip the original separator line
if i < len(lines) and '===' in lines[i]:
i += 1
while i < len(lines):
line = lines[i]
stripped = line.strip()
if '|| Subsystem || Report/Notes ||' in line:
break
if stripped.startswith('+--'):
i += 1
continue
if not stripped.startswith('|'):
i += 1
continue
vote, subsystem, runtime, comment = parse_table_row(line)
# Section header (vote and subsystem are empty)
if not vote and not subsystem:
if comment:
content.append(f'| | | | {comment} |\n')
elif runtime and is_runtime(runtime):
content.append(f'| | | {runtime} | |\n')
i += 1
continue
# Data row with vote
if vote in VOTE_EMOJI:
vote_emoji = convert_vote(vote)
comment_parts = [comment] if comment else []
continuation_parts, i = collect_continuation_lines(lines, i + 1)
comment_parts.extend(continuation_parts)
comment_text = ' '.join(comment_parts)
content.append(f'| {vote_emoji} | {subsystem} | {runtime} | {comment_text} |\n')
continue
# Other cases, skip
i += 1
return content, i
def process_second_table(lines: List[str], start_idx: int) -> Tuple[List[str], int]:
"""
Process the second table (Subsystem, Report/Notes).
Returns:
Tuple of (Markdown lines, next index to process)
"""
content = [
'\n## Subsystem Reports\n\n',
'| Subsystem | Report/Notes |\n',
'|-----------|------------|\n'
]
i = start_idx
# Skip the original separator line
if i < len(lines) and '===' in lines[i]:
i += 1
while i < len(lines):
line = lines[i]
stripped = line.strip()
if not stripped.startswith('|'):
break
# Split by | and get non-empty parts
parts = [p.strip() for p in stripped.split('|') if p.strip()]
if len(parts) >= 2:
content.append(f'| {parts[0]} | {parts[1]} |\n')
i += 1
return content, i
def convert_console_to_markdown(input_dir: str, output_file: Optional[str] = None) -> str:
"""Convert Yetus console output to Markdown format."""
input_path = Path(input_dir)
if not input_path.is_dir():
print(f'Error: Input path "{input_dir}" is not a directory', file=sys.stderr)
sys.exit(1)
console_file = input_path / 'console.txt'
if not console_file.exists():
print(f'Error: console.txt not found in "{input_dir}"', file=sys.stderr)
sys.exit(1)
with open(console_file, 'r') as f:
lines = f.readlines()
content = []
i = 0
while i < len(lines):
line = lines[i]
stripped = line.strip()
if stripped == '-1 overall':
content.append(f'<h1><b>❌ {stripped}</b></h1>\n')
i += 1
elif stripped == '+1 overall':
content.append(f'<h1><b>✅ {stripped}</b></h1>\n')
i += 1
elif '| Vote |' in line and 'Subsystem' in line:
table_content, i = process_first_table(lines, i + 1)
content.extend(table_content)
counts, details = aggregate_failed_tests(input_path)
if counts:
content.extend(generate_failed_tests_table(counts, details))
elif '|| Subsystem || Report/Notes ||' in line:
table_content, i = process_second_table(lines, i + 1)
content.extend(table_content)
else:
i += 1
result = ''.join(content)
if output_file:
with open(output_file, 'w') as f:
f.write(result)
print(f'Converted {input_dir} to {output_file}', file=sys.stderr)
else:
print(result, end='')
return result
def main():
if len(sys.argv) < 2:
print(f'Usage: {sys.argv[0]} <input_directory> [output_file]', file=sys.stderr)
print(
f' input_directory: Directory containing console.txt and optional patch-unit-*.txt files',
file=sys.stderr)
print(f' If output_file is not provided, output goes to stdout', file=sys.stderr)
sys.exit(1)
input_dir = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else None
if not Path(input_dir).exists():
print(f'Error: Input directory "{input_dir}" does not exist', file=sys.stderr)
sys.exit(1)
convert_console_to_markdown(input_dir, output_file)
if __name__ == '__main__':
main()