blob: 69f68ad40c8197053ab7885486af837d3b3adbe6 [file]
############################################################################
# SPDX-License-Identifier: Apache-2.0
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
# ASF licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
############################################################################
"""Multi-session test integrator for NTFC."""
import glob
import os
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from typing import Any, Dict, FrozenSet, List, Optional, Tuple
from xml.etree import ElementTree
import yaml # type: ignore
from prettytable import PrettyTable
from ntfc.builder import NuttXBuilder
from ntfc.log.logger import logger
from ntfc.log.manager import LogManager
from ntfc.log.report import Reporter
from ntfc.pytest.mypytest import MyPytest
###############################################################################
# Dataclass: SessionConfig
###############################################################################
@dataclass
class SessionConfig:
"""Configuration for a single test session."""
name: str
confpath: str
testpath: str
resources: List[str] = field(default_factory=list)
exitonfail: Optional[bool] = None
loops: Optional[int] = None
timeout: Optional[int] = None
timeout_session: Optional[int] = None
modules: Optional[str] = None
###############################################################################
# Dataclass: MultiOptions
###############################################################################
@dataclass
class MultiOptions:
"""Global options for multi-session execution."""
fail_fast: bool = False
parallel: bool = False
###############################################################################
# Dataclass: SessionResult
###############################################################################
@dataclass
class SessionResult:
"""Result of a single test session execution."""
name: str
exit_code: int
result_dir: str
###############################################################################
# Class: ManifestConfig
###############################################################################
class ManifestConfig:
"""Parse and validate a multi-session manifest YAML file."""
def __init__(
self,
options: "MultiOptions",
sessions: List["SessionConfig"],
) -> None:
"""Initialize ManifestConfig.
:param options: Multi-session execution options.
:param sessions: Ordered list of session configurations.
"""
self.options = options
self.sessions = sessions
@classmethod
def load(cls, path: str) -> "ManifestConfig":
"""Load and validate manifest from YAML file.
:param path: Path to the manifest YAML file.
:return: Validated ManifestConfig instance.
:raises ValueError: If the manifest is invalid.
:raises FileNotFoundError: If the manifest file does not exist.
"""
with open(path, encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, dict):
raise ValueError("manifest must be a YAML mapping")
options = cls._parse_options(data.get("options", {}))
sessions = cls._parse_sessions(data.get("sessions", []))
return cls(options, sessions)
@staticmethod
def _parse_options(raw_options: Any) -> "MultiOptions":
"""Parse and validate the options section.
:param raw_options: Raw options from YAML.
:return: Validated MultiOptions instance.
"""
if not isinstance(raw_options, dict):
raise ValueError("'options' must be a mapping")
return MultiOptions(
fail_fast=bool(raw_options.get("fail_fast", False)),
parallel=bool(raw_options.get("parallel", False)),
)
@staticmethod
def _parse_session(idx: int, raw: Any, names: set[str]) -> "SessionConfig":
"""Parse and validate a single session entry.
:param idx: Index of the session in the list.
:param raw: Raw session dict from YAML.
:param names: Set of already-seen session names.
:return: Validated SessionConfig instance.
"""
if not isinstance(raw, dict):
raise ValueError(f"session {idx} must be a mapping")
name = raw.get("name")
if not name or not isinstance(name, str):
raise ValueError(
f"session {idx}: 'name' is required " f"and must be a string"
)
if name in names:
raise ValueError(f"duplicate session name: '{name}'")
names.add(name)
confpath = raw.get("confpath")
if not confpath or not isinstance(confpath, str):
raise ValueError(f"session '{name}': 'confpath' is required")
testpath = raw.get("testpath")
if not testpath or not isinstance(testpath, str):
raise ValueError(f"session '{name}': 'testpath' is required")
resources = raw.get("resources", [])
if not isinstance(resources, list):
raise ValueError(f"session '{name}': 'resources' must be a list")
return SessionConfig(
name=name,
confpath=confpath,
testpath=testpath,
resources=[str(r) for r in resources],
exitonfail=raw.get("exitonfail"),
loops=raw.get("loops"),
timeout=raw.get("timeout"),
timeout_session=raw.get("timeout_session"),
modules=raw.get("modules"),
)
@classmethod
def _parse_sessions(cls, raw_sessions: Any) -> List["SessionConfig"]:
"""Parse and validate the sessions list.
:param raw_sessions: Raw sessions list from YAML.
:return: List of validated SessionConfig instances.
"""
if not isinstance(raw_sessions, list) or not raw_sessions:
raise ValueError("'sessions' must be a non-empty list")
sessions: List[SessionConfig] = []
names: set[str] = set()
for idx, raw in enumerate(raw_sessions):
sessions.append(cls._parse_session(idx, raw, names))
return sessions
def resolve_session_config(
self, session: "SessionConfig"
) -> Dict[str, Any]:
"""Load session YAML config and apply per-session overrides.
:param session: Session configuration to resolve.
:return: Configuration dictionary.
"""
with open(session.confpath, encoding="utf-8") as f:
conf: Dict[str, Any] = yaml.safe_load(f) or {}
# apply per-session overrides
cfg = conf.setdefault("config", {})
if session.loops is not None:
cfg["loops"] = session.loops
if session.timeout is not None:
cfg["timeout"] = session.timeout
if session.timeout_session is not None:
cfg["timeout_session"] = session.timeout_session
return conf
###############################################################################
# Class: MultiSessionRunner
###############################################################################
class MultiSessionRunner:
"""Orchestrate multi-session build, test, and report merging."""
def __init__(
self,
manifest: "ManifestConfig",
rebuild: bool = True,
verbose: bool = False,
debug: bool = False,
logcfg: Optional[str] = None,
) -> None:
"""Initialize MultiSessionRunner.
:param manifest: Parsed manifest configuration.
:param rebuild: Force rebuild of all configurations.
:param verbose: Enable verbose output.
:param debug: Enable debug output.
:param logcfg: Path to log configuration file.
"""
self._manifest = manifest
self._rebuild = rebuild
self._verbose = verbose
self._debug = debug
self._logcfg = logcfg
def run(self) -> int:
"""Execute the full multi-session pipeline.
:return: 0 if all sessions passed, 1 otherwise.
"""
# Phase 1: Build all configurations
built_configs = self._phase_build()
if built_configs is None:
return 1
# Create shared session directory for all sessions
log_manager = LogManager(self._logcfg)
log_manager.cleanup()
self._session_dir = log_manager.new_session_dir()
# Phase 2: Run all test sessions
results = self._phase_test(built_configs)
# Phase 3: Merge reports into the shared session directory
self._phase_report(results)
# Print final summary
self._print_summary(results)
# Aggregate exit codes
for r in results:
if r.exit_code != 0:
return 1
return 0
@staticmethod
def _parse_session_counts(
result_dir: str,
) -> Tuple[int, int, int, int, float]:
"""Parse test counts from a session's report XML files.
The Reporter splits the original ``report.xml`` into per-module
files under ``<result_dir>/report/`` and deletes the original.
This method reads those per-module files. Falls back to
``report.xml`` if the split directory does not exist.
:param result_dir: Path to session result directory.
:return: Tuple of (passes, failures, skipped, errors, time).
"""
# Reporter splits report.xml into per-module files here
report_dir = os.path.join(result_dir, "report")
xml_files: List[str] = []
if os.path.isdir(report_dir):
xml_files = sorted(glob.glob(os.path.join(report_dir, "*.xml")))
# fallback to unsplit report.xml
if not xml_files:
single = os.path.join(result_dir, "report.xml")
if os.path.exists(single):
xml_files = [single]
if not xml_files:
return 0, 0, 0, 0, 0.0
passes = 0
failures = 0
skipped_count = 0
errors = 0
total_time = 0.0
for xml_path in xml_files:
try:
root = ElementTree.parse(xml_path).getroot()
except ElementTree.ParseError:
continue
for ts in root.iter("testsuite"):
total = int(ts.attrib.get("tests", 0))
f = int(ts.attrib.get("failures", 0))
s = int(ts.attrib.get("skipped", 0))
e = int(ts.attrib.get("errors", 0))
t = float(ts.attrib.get("time", 0))
passes += max(total - f - s, 0)
failures += f
skipped_count += s
errors += e
total_time += t
return passes, failures, skipped_count, errors, total_time
def _print_summary(self, results: List["SessionResult"]) -> None:
"""Print a summary table of all session results.
:param results: List of session results.
"""
table = PrettyTable()
table.field_names = [
"Session",
"Result",
"Pass",
"Fail",
"Skip",
"Error",
"Time",
]
table.align["Session"] = "l"
sess_passed = 0
sess_failed = 0
sess_skipped = 0
total_pass = 0
total_fail = 0
total_skip = 0
total_error = 0
total_time = 0.0
for r in results:
if r.exit_code == 0:
status = "PASS"
sess_passed += 1
elif r.exit_code == -1:
status = "SKIP"
sess_skipped += 1
else:
status = "FAIL"
sess_failed += 1
p, f, s, e, t = self._parse_session_counts(r.result_dir)
total_pass += p
total_fail += f
total_skip += s
total_error += e
total_time += t
table.add_row([r.name, status, p, f, s, e, f"{t:.2f}"])
table.add_row(
[
"Total",
"",
total_pass,
total_fail,
total_skip,
total_error,
f"{total_time:.2f}",
]
)
total = len(results)
summary = (
f"[MULTI_SUMMARY] sessions:{total} "
f"passed:{sess_passed} failed:{sess_failed} "
f"skipped:{sess_skipped}"
)
print(f"\n\n{table}\n{summary}")
def _build_key(self, conf: Dict[str, Any]) -> FrozenSet[Tuple[str, str]]:
"""Compute deduplication key from config.
:param conf: Configuration dictionary.
:return: Frozenset of identifying tuples.
"""
items: list[Tuple[str, str]] = []
# collect all defconfigs and build_env from all products/cores
for key in conf:
if "product" not in key:
continue
cores = conf[key].get("cores", {})
for core_name in sorted(cores):
core = cores[core_name]
defconfig = core.get("defconfig", "")
if defconfig:
items.append((f"{key}.{core_name}.defconfig", defconfig))
# include global build_env
build_env = conf.get("config", {}).get("build_env", {})
for k, v in sorted(build_env.items()):
items.append((f"build_env.{k}", str(v)))
# include kv overrides
kv = conf.get("config", {}).get("kv", {})
for k, v in sorted(kv.items()):
items.append((f"kv.{k}", str(v)))
return frozenset(items)
def _phase_build(
self,
) -> Optional[Dict[str, Dict[str, Any]]]:
"""Build all unique configurations.
:return: Mapping of session name to built config, or None
on build failure.
"""
logger.info("[Multi] Phase 1: Building all configurations")
built_configs: Dict[str, Dict[str, Any]] = {}
build_cache: Dict[FrozenSet[Tuple[str, str]], Dict[str, Any]] = {}
for session in self._manifest.sessions:
conf = self._manifest.resolve_session_config(session)
bkey = self._build_key(conf)
if bkey in build_cache:
logger.info(
f"[Multi] Reusing build for session " f"'{session.name}'"
)
built_configs[session.name] = build_cache[bkey]
continue
# set loops default
conf.setdefault("config", {}).setdefault("loops", 1)
builder = NuttXBuilder(conf, self._rebuild)
if builder.need_build():
try:
builder.build_all()
except Exception as e:
logger.error(
f"[Multi] Build failed for session "
f"'{session.name}': {e}"
)
return None
conf = builder.new_conf()
build_cache[bkey] = conf
built_configs[session.name] = conf
logger.info(
f"[Multi] All {len(self._manifest.sessions)} "
f"configurations built successfully"
)
return built_configs
def _phase_test(
self, built_configs: Dict[str, Dict[str, Any]]
) -> List["SessionResult"]:
"""Run test sessions with resource-aware scheduling.
:param built_configs: Mapping of session name to config.
:return: List of session results.
"""
logger.info("[Multi] Phase 2: Running test sessions")
if self._manifest.options.parallel:
return self._run_parallel(built_configs)
return self._run_sequential(built_configs)
def _run_session(
self,
session: "SessionConfig",
conf: Dict[str, Any],
fail_event: Optional[threading.Event] = None,
) -> "SessionResult":
"""Run a single test session.
:param session: Session configuration.
:param conf: Resolved and built configuration dict.
:param fail_event: Optional event to check for early abort.
:return: Session result.
"""
# check fail_fast abort
if fail_event and fail_event.is_set():
return SessionResult(
name=session.name, exit_code=-1, result_dir=""
)
exitonfail = session.exitonfail or False
modules = None
if session.modules:
modules = session.modules.replace(",", " ").split()
pt = MyPytest(conf, exitonfail, self._verbose, modules=modules)
logger.info(f"[Multi] Running session '{session.name}'")
session_result_dir = os.path.join(self._session_dir, session.name)
result: Dict[str, Any] = {
"logcfg": self._logcfg,
"result_dir": session_result_dir,
}
exit_code = pt.runner(session.testpath, result)
# read result_dir from the instance, not global pytest module
result_dir: str = pt.result_dir
if fail_event and exit_code != 0 and self._manifest.options.fail_fast:
fail_event.set()
return SessionResult(
name=session.name,
exit_code=int(exit_code),
result_dir=result_dir,
)
def _run_sequential(
self, built_configs: Dict[str, Dict[str, Any]]
) -> List["SessionResult"]:
"""Run all sessions sequentially.
:param built_configs: Mapping of session name to config.
:return: List of session results.
"""
results: List[SessionResult] = []
for session in self._manifest.sessions:
conf = built_configs[session.name]
sr = self._run_session(session, conf)
results.append(sr)
if self._manifest.options.fail_fast and sr.exit_code != 0:
logger.info(
f"[Multi] fail_fast: stopping after "
f"session '{session.name}'"
)
break
return results
def _run_parallel(
self, built_configs: Dict[str, Dict[str, Any]]
) -> List["SessionResult"]:
"""Run sessions in parallel with resource-aware scheduling.
Sessions with overlapping resources are serialized via locks.
Sessions with no resource conflicts run concurrently.
:param built_configs: Mapping of session name to config.
:return: List of session results.
"""
# create one lock per unique resource
resource_locks: Dict[str, threading.Lock] = {}
for session in self._manifest.sessions:
for res in session.resources:
if res not in resource_locks:
resource_locks[res] = threading.Lock()
fail_event = threading.Event()
results: List[SessionResult] = []
results_lock = threading.Lock()
def _worker(session: "SessionConfig") -> "SessionResult":
# acquire all resource locks for this session
locks = sorted(
set(
resource_locks[r]
for r in session.resources
if r in resource_locks
),
key=id,
)
for lock in locks:
lock.acquire()
try:
conf = built_configs[session.name]
return self._run_session(session, conf, fail_event)
finally:
for lock in locks:
lock.release()
max_workers = max(1, len(self._manifest.sessions))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(_worker, s): s for s in self._manifest.sessions
}
for future in as_completed(futures):
sr = future.result()
with results_lock:
results.append(sr)
# sort results by original session order
order = {s.name: i for i, s in enumerate(self._manifest.sessions)}
results.sort(key=lambda r: order.get(r.name, 0))
return results
def _phase_report(self, results: List["SessionResult"]) -> None:
"""Merge sub-session reports into a unified report.
:param results: List of session results from test phase.
"""
logger.info("[Multi] Phase 3: Merging reports")
reporter = Reporter()
self._merge_session_reports(self._session_dir, results, reporter)
@staticmethod
def _copy_testcase(
parent: ElementTree.Element,
tc: ElementTree.Element,
session_name: str,
) -> None:
"""Copy a testcase element with session-namespaced classname.
:param parent: Parent testsuite element.
:param tc: Source testcase element.
:param session_name: Session name for namespacing.
"""
new_tc = ElementTree.SubElement(parent, "testcase")
for attr, val in tc.attrib.items():
new_tc.set(attr, val)
orig_cn = tc.get("classname", "")
new_tc.set("classname", f"{session_name}::{orig_cn}")
for child in tc:
new_child = ElementTree.SubElement(new_tc, child.tag)
for attr, val in child.attrib.items():
new_child.set(attr, val)
if child.text:
new_child.text = child.text
@classmethod
def _merge_one_session(
cls,
root: ElementTree.Element,
sr: "SessionResult",
) -> None:
"""Merge one session's report.xml into the root element.
:param root: Root testsuites element to append to.
:param sr: Session result with result_dir path.
"""
xml_path = os.path.join(sr.result_dir, "report.xml")
if not os.path.exists(xml_path):
return
try:
tree = ElementTree.parse(xml_path)
except ElementTree.ParseError:
logger.warning(f"[Multi] Failed to parse {xml_path}")
return
src_root = tree.getroot()
for testsuite in src_root.findall(".//testsuite"):
new_suite = ElementTree.SubElement(root, "testsuite")
for attr, val in testsuite.attrib.items():
new_suite.set(attr, val)
orig_name = testsuite.get("name", "")
new_suite.set("name", f"{sr.name}::{orig_name}")
for tc in testsuite.findall("testcase"):
cls._copy_testcase(new_suite, tc, sr.name)
@classmethod
def _merge_session_reports(
cls,
master_dir: str,
results: List["SessionResult"],
reporter: "Reporter",
) -> None:
"""Merge sub-session JUnit XML reports into unified report.
:param master_dir: Master session directory.
:param results: List of session results.
:param reporter: Reporter instance for summary generation.
"""
root = ElementTree.Element("testsuites")
for sr in results:
cls._merge_one_session(root, sr)
# write merged report
merged_path = os.path.join(master_dir, "report.xml")
merged_tree = ElementTree.ElementTree(root)
merged_tree.write(merged_path)
# generate unified summary using existing reporter
reporter.generate_result_summary(master_dir)