blob: e51376a25f5d5c21cb21ebc46fc668fb29c23cab [file]
############################################################################
# SPDX-License-Identifier: Apache-2.0
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
# ASF licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
############################################################################
"""Product core class implementation."""
import re
from enum import Enum as _Enum
from enum import auto
from typing import (
TYPE_CHECKING,
Any,
List,
Optional,
Tuple,
Union,
)
from ntfc.command_builder import CommandBuilder
from ntfc.coreconfig import CoreConfig
from ntfc.device.common import CmdReturn, CmdStatus
from ntfc.log.logger import logger
if TYPE_CHECKING:
from ntfc.device.common import DeviceCommon
from ntfc.log.handler import LogHandler
from ntfc.type_defs import PatternLike
###############################################################################
# Enum: CoreStatus
###############################################################################
class CoreStatus(_Enum):
"""Core health status returned by :attr:`ProductCore.status`.
:cvar NORMAL: Core is operating normally.
:cvar CRASH: Core has crashed.
:cvar BUSYLOOP: Core is stuck in a busy loop.
:cvar FLOOD: Core is flooding output (stuck command).
:cvar NOTALIVE: Core process/device is no longer alive.
"""
NORMAL = auto()
CRASH = auto()
BUSYLOOP = auto()
FLOOD = auto()
NOTALIVE = auto()
###############################################################################
# Class: ProductCore
###############################################################################
class ProductCore:
"""This class implements product core under test."""
def __init__(
self,
device: "DeviceCommon",
conf: "CoreConfig",
ignored_cores: Optional[List[str]] = None,
) -> None:
"""Initialize product core under test.
:param device: DeviceCommon instance
:param conf: CoreConfig instance
:param ignored_cores: Core names to skip in :meth:`get_core_info`.
Defaults to ``["dsp"]``.
"""
if not device:
raise TypeError("Device instance is required")
if not isinstance(conf, CoreConfig):
raise TypeError("Config instance is required")
self._device = device
self._uptime = conf.uptime
self._name = conf.name
self._conf = conf
self._ignored_cores: List[str] = (
list(ignored_cores) if ignored_cores is not None else ["dsp"]
)
self._builder = CommandBuilder(device.prompt, device.no_cmd)
self._prompt = device.prompt
self._main_prompt = self._prompt
self._cur_prompt = self._main_prompt.decode("utf-8", errors="ignore")
# cores info not ready yet, done in self.init() method called when
# device is ready
self._core0: Optional[str] = None
self._cur_core: Optional[str] = None
self._cores: Tuple[str, ...] = ()
def __str__(self) -> str:
"""Get string for object."""
return f"ProductCore: {self._name}"
def _match_not_found(self, rematch: Optional[re.Match[Any]]) -> bool:
"""Check for 'command not found' message."""
if not rematch:
return False
matched = rematch.group().strip()
if isinstance(matched, (bytes, bytearray)):
matched = matched.decode("utf-8", errors="ignore")
return self._device.no_cmd in matched
def sendCommand( # noqa: N802
self,
cmd: str,
expects: Optional[Union[str, List[str]]] = None,
args: Optional[Union[str, List[str]]] = None,
timeout: int = 30,
flag: str = "",
match_all: bool = True,
regexp: bool = False,
fail_pattern: Optional[Union[str, List[str]]] = None,
) -> "CmdStatus":
"""Send command and wait for expected response.
:param cmd: Command to send
:param expects: List of expected responses or None
:param args: List of additional arguments to append to the command
or None
:param timeout: Timeout in seconds
:param flag: Default _prompt
:param match_all: "True" for all responses to match, "False" for any
response to match
:param regexp: "False" for str to match, "True" for regular
expression to match
:param fail_pattern: Pattern or list of patterns whose presence in the
output immediately terminates the read and returns FAILED. Respects
the ``regexp`` flag.
:raises ValueError:
:raises DeviceError: Communication error with device
:raises TimeoutError: Response timeout
:return: status : command execution status
"""
encoded = self._builder.build(
cmd, expects, args, flag, match_all, regexp, fail_pattern
)
logger.debug(
f"Sending command: {encoded.cmd.decode()}, expecting: "
f"{encoded.pattern.decode()} (timeout={timeout}s)"
)
cmdret = self._device.send_cmd_read_until_pattern(
encoded.cmd,
pattern=encoded.pattern,
timeout=timeout,
fail_pattern=encoded.fail_pattern,
)
if cmdret.valid_match() and self._match_not_found(cmdret.rematch):
return CmdStatus.NOTFOUND
return cmdret.status
def sendCommandReadUntilPattern( # noqa: N802
self,
cmd: str,
pattern: "Optional[PatternLike]" = None,
args: Optional[Union[str, List[str]]] = None,
timeout: int = 30,
fail_pattern: "Optional[PatternLike]" = None,
) -> CmdReturn:
"""Send command to device and read until a specific pattern.
:param cmd: (str or list of strs) command to send to device
:param pattern: (str, bytes, or list of (str, bytes)) String or regex
pattern to look for. If a list, patterns will be concatenated
with '.*'. The pattern will be converted to bytes for matching.
:param args: List of additional arguments to append to the command
or None
:param timeout: (int) timeout value in seconds, default 30s.
:param fail_pattern: (str, bytes, or list of (str, bytes)) Regex
pattern or list of patterns whose presence in the output immediately
terminates the read and returns FAILED.
:return: CmdReturn : command return data
"""
encoded = self._builder.build_raw(cmd, pattern, args, fail_pattern)
logger.debug(
f"Sending command: {encoded.cmd.decode()}, expecting pattern: "
f"{encoded.pattern.decode()} (timeout={timeout}s)"
)
return self._device.send_cmd_read_until_pattern(
encoded.cmd,
pattern=encoded.pattern,
timeout=timeout,
fail_pattern=encoded.fail_pattern,
)
def readUntilPattern( # noqa: N802
self,
pattern: "PatternLike",
timeout: int = 30,
fail_pattern: "Optional[PatternLike]" = None,
) -> CmdReturn:
"""Read device output until a pattern without sending a command.
Useful for catching output from an already-running program and
checking whether it passes or fails based on the patterns found.
:param pattern: (str, bytes, or list of (str, bytes)) Regex pattern to
wait for. Signals a successful outcome.
:param timeout: (int) timeout value in seconds, default 30s.
:param fail_pattern: (str, bytes, or list of (str, bytes)) Regex
pattern or list of patterns whose presence in the output immediately
terminates the read and returns FAILED.
:return: CmdReturn : command return data
"""
pattern_bytes = self._builder.encode_pattern(pattern)
fail_pattern_bytes = (
self._builder.encode_fail_pattern(fail_pattern)
if fail_pattern
else None
)
logger.debug(
f"Reading until pattern: "
f"{pattern.decode() if isinstance(pattern, bytes) else pattern} "
f"(timeout={timeout}s)"
)
return self._device.read_until_pattern(
pattern=pattern_bytes,
timeout=timeout,
fail_pattern=fail_pattern_bytes,
)
def sendCtrlCmd(self, ctrl_char: str) -> None: # noqa: N802
"""Send a control character command (e.g., Ctrl+C).
:param ctrl_char: Control character to send
"""
if len(ctrl_char) != 1:
raise ValueError(
"ctrl_char must be a single alphabetic character."
)
if self._device.send_ctrl_cmd(ctrl_char) == CmdStatus.SUCCESS:
logger.info(f"Successfully sent Ctrl+{ctrl_char}")
else:
logger.warning(f"Failed to send Ctrl+{ctrl_char}")
# REVISIT: no proc/rpmsg in nuttx/upstream!
def get_core_info(self) -> Tuple[str, ...]:
"""Retrieve CPU core information from the device.
:return: Tuple containing Local CPU as first element followed by Remote
CPUs. Returns empty tuple on failure.
"""
cmd_rpmsg = b"cat proc/rpmsg"
timeout = 5
# Send command and get raw output
cmdret = self._device.send_cmd_read_until_pattern(
cmd_rpmsg, pattern=self._main_prompt, timeout=timeout
)
if cmdret.status != 0:
logger.error(f"Command failed with return code: {cmdret.status}")
return ()
# Parse output
decoded_output = cmdret.output
lines = [line.strip() for line in decoded_output.splitlines()]
# Find header location
header_index = next(
(
i
for i, line in enumerate(lines)
if "Local CPU" in line and "Remote CPU" in line
),
-1,
)
# Early return if header not found
if header_index == -1:
logger.warning("CPU information header not found in output")
return ()
# Process data rows
core_data = []
for line in lines[header_index + 1 :]: # pragma: no cover
prompt = self._device.prompt.decode()
if line.startswith(prompt):
break # Stop at next prompt
parts = line.split()
if len(parts) >= 2:
core_data.append((parts[0], parts[1]))
# Validate and format results
if not core_data:
logger.warning("No valid CPU data found after header")
return ()
# Extract unique Local CPU (default single value)
local_cpu = core_data[0][0]
# Create result tuple (Local CPU + all Remote CPUs)
return (
local_cpu,
*(
cpu[1]
for cpu in core_data
if cpu[1] not in self._ignored_cores
),
)
def init(self) -> None:
"""Finish product initialization."""
cores = self.get_core_info()
self._core0 = cores[0] if cores else "core0"
self._cur_core = self._core0
self._cores = cores if cores else ("core0",)
logger.info(f"Current product support cores: {self._cores}")
@property
def cur_core(self) -> Optional[str]:
"""Get current core."""
return self._cur_core
@property
def cores(self) -> Tuple[str, ...]:
"""Get cores."""
return self._cores
def switch_core(self, target_core: str = "") -> int:
"""Switch the target core of the device.
:param target_core: Core to switch to.
:return: 0 on success, -1 on failure
"""
if not self._core0 and not self._cur_core and not self._cores:
raise ValueError("Product not initialized!")
if not target_core:
logger.debug(
"The target_core has no value and cannot be switched."
)
return CmdStatus.NOTFOUND
if self._core0 and target_core.lower() == self._core0.lower():
logger.warning(
"The target core is the main core, and the core"
" is not switched."
)
return CmdStatus.SUCCESS
logger.info(f"Attempting to switch to core: {target_core}")
if target_core.lower() not in tuple(
core.lower() for core in self._cores
):
logger.debug(f"There is no {target_core} core in the device")
return CmdStatus.NOTFOUND
cmd = f"cu -l /dev/tty{target_core.upper()}\n\n"
pattern = f"{target_core}>"
rc = self.sendCommand(cmd, pattern, match_all=False, timeout=5)
if rc == CmdStatus.SUCCESS:
logger.info(f"Core switch to {target_core} succeeded")
return rc
def get_current_prompt(self) -> str:
"""Dynamically obtain the device current prompt.
:return: The current prompt (e.g., ap>) (str)
or None: If the current prompt cannot be determined
"""
core_pattern = rb"(\S+)>"
matches: List[str] = []
for _ in range(5):
cmdret = self._device.send_cmd_read_until_pattern(
b"\n", core_pattern, 1
)
if cmdret.valid_match() and cmdret.rematch:
# Extract the matched core name
core_name = (
cmdret.rematch.group(1).decode("utf-8", errors="ignore")
+ ">"
)
matches.append(core_name)
# Check whether two or more of the three results are consistent
if len(matches) >= 2 and len(set(matches)) == 1:
logger.info(f"The current prompt is {matches[0]}")
return matches[0]
logger.error("Failed to get current prompt, use default prompt.")
return ">"
def reboot(self, timeout: int = 30) -> bool:
"""Reboot the device by calling the device's reboot function.
:param timeout: (int) Timeout in seconds for the reboot operation.
Default is 30 seconds.
:return: (bool) True if the reboot was successful, False otherwise.
"""
logger.info(
f"Attempting to reboot the device with " f"timeout: {timeout}s"
)
success = self._device.reboot(timeout=timeout)
if success:
logger.info("Device rebooted successfully.")
else:
logger.error("Failed to reboot the device.")
return success
def force_panic(self, timeout: int = 30) -> bool:
"""Trigger a kernel panic for debugging and core dump generation.
:return: True if the force panic was successful, False otherwise.
"""
panic_char = self.device.panic_char
if not panic_char:
logger.error("Force panic not supported.")
return False
self._device.log_event("force_panic")
ret = self.device.send_ctrl_cmd(panic_char)
if isinstance(ret, CmdStatus):
return ret == CmdStatus.SUCCESS
return bool(ret)
@property
def busyloop(self) -> bool:
"""Check if the device is in busy loop."""
return self._device.busyloop
@property
def flood(self) -> bool:
"""Check if flood condition was detected."""
return self._device.flood
@property
def crash(self) -> bool:
"""Check if the device is crashed."""
return self._device.crash
@property
def notalive(self) -> bool:
"""Check if the device is dead."""
return self._device.notalive
@property
def status(self) -> CoreStatus:
"""Check core status with all failure mode detection.
:return: :class:`CoreStatus` representing the current state.
"""
if self.crash:
return CoreStatus.CRASH
if self.busyloop:
return CoreStatus.BUSYLOOP
if self.flood:
return CoreStatus.FLOOD
if self.notalive:
return CoreStatus.NOTALIVE
return CoreStatus.NORMAL
@property
def device(self) -> "DeviceCommon":
"""Get underlying device."""
return self._device
@property
def name(self) -> Any:
"""Get product name."""
return self._name
@property
def prompt(self) -> bytes:
"""Get core prompt."""
return self._prompt
@property
def conf(self) -> "CoreConfig":
"""Get core configuration."""
return self._conf
def start_log_collect(self, logs: "LogHandler") -> None:
"""Start device log collector."""
self._device.start_log_collect(logs)
def stop_log_collect(self) -> None:
"""Stop device log collector."""
self._device.stop_log_collect()
def start(self) -> None:
"""Start device."""
self._device.start()
def check_cmd(self, cmd_pattern: str) -> bool:
"""Check if a command pattern is available in the ELF binary.
This method validates whether a specific command or set of
commands is present in the core's ELF binary by searching for
corresponding symbols. It supports both single commands and
alternative command patterns separated by '|'.
:param cmd_pattern: Command pattern to check for. Can contain
alternatives separated by '|'
(e.g., 'test1|test2')
:return: True if the command pattern is found, False otherwise
Note:
- Requires ELF parser to be available in device
- Supports alternative patterns with '|'
- Used to validate core capabilities before executing
"""
# Check if device supports command checking
if hasattr(self._device, "elf_parser") and self._device.elf_parser:
logger.debug(f"Checking command pattern: {cmd_pattern}")
# Split by '|' to support alternative patterns
alternatives = (
cmd_pattern.split("|") if "|" in cmd_pattern else [cmd_pattern]
)
for pattern in alternatives:
# For cmocka tests, append _main to symbol name
symbol_pattern_str: str = (
f"{pattern}_main" if "cmocka" in pattern else pattern
)
# Support regex wildcards
if ".*" in symbol_pattern_str:
symbol_pattern = re.compile(symbol_pattern_str)
else:
tmp = symbol_pattern_str
symbol_pattern = tmp # type: ignore[assignment]
if self._device.elf_parser.has_symbol(symbol_pattern):
return True
return False
# Fallback: try to execute the command and check if it exists
logger.warning(
"ELF parser not available, trying command check for: "
f"{cmd_pattern}"
)
# Send 'help' command to check if command exists
result = self.sendCommandReadUntilPattern("help", timeout=5)
if result.status == CmdStatus.SUCCESS:
# Check if any of the command alternatives are in the help output
alternatives = (
cmd_pattern.split("|") if "|" in cmd_pattern else [cmd_pattern]
)
for pattern in alternatives:
if pattern.lower() in result.output.lower():
return True
return False