blob: a9370644da2e8dd7fdb27b7a005b1008c23b7b12 [file]
############################################################################
# SPDX-License-Identifier: Apache-2.0
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
# ASF licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
############################################################################
"""Serial-based device implementation."""
from typing import TYPE_CHECKING, Any, Dict
import serial # type: ignore
from ntfc.log.logger import logger
from .common import DeviceCommon
if TYPE_CHECKING:
from ntfc.coreconfig import CoreConfig
###############################################################################
# Class: DeviceSerial
###############################################################################
class DeviceSerial(DeviceCommon):
"""This class implements host-based sim emulator."""
def __init__(self, conf: "CoreConfig"):
"""Initialize sim emulator device."""
DeviceCommon.__init__(self, conf, echo=False)
self._ser = None
def _decode_exec_args(self, args: str) -> Dict[str, Any]:
"""Decode a serial port configuration string."""
try:
baud, parity, data_bits, stop_bits = args.split(",")
parity_map = {
"n": serial.PARITY_NONE,
"N": serial.PARITY_NONE,
"e": serial.PARITY_EVEN,
"E": serial.PARITY_EVEN,
"o": serial.PARITY_ODD,
"O": serial.PARITY_ODD,
"m": serial.PARITY_MARK,
"M": serial.PARITY_MARK,
"s": serial.PARITY_SPACE,
"S": serial.PARITY_SPACE,
}
bytesize_map = {
5: serial.FIVEBITS,
6: serial.SIXBITS,
7: serial.SEVENBITS,
8: serial.EIGHTBITS,
}
stopbits_map = {
1: serial.STOPBITS_ONE,
1.5: serial.STOPBITS_ONE_POINT_FIVE,
2: serial.STOPBITS_TWO,
}
return {
"baudrate": int(baud),
"parity": parity_map.get(parity, serial.PARITY_NONE),
"bytesize": bytesize_map.get(int(data_bits), serial.EIGHTBITS),
"stopbits": stopbits_map.get(
float(stop_bits), serial.STOPBITS_ONE
),
}
except Exception as e:
raise ValueError(f"Invalid format '{args}': {e}") from e
def _dev_is_health_priv(self) -> bool:
"""Check if the serial device is OK."""
if not self._ser:
return False
return True
def _write(self, data: bytes) -> None:
"""Write to the serial device."""
if not self.dev_is_health():
return
assert self._ser
# send char by char to avoid line length full
for c in data:
self._ser.write(bytes([c]))
# add new line if missing
if data[-1] != ord("\n"):
self._ser.write(b"\n") # pragma: no cover
# read all garbage left by character echo
_ = self._read_all(timeout=0)
self._console_log(_)
def _write_ctrl(self, c: str) -> None:
"""Write a control character to the serial device."""
if not self.dev_is_health():
return
assert self._ser
code = ord(c.upper()) - 64
self._ser.write(bytes([code]))
def _read(self) -> bytes:
"""Read data from the serial device."""
if not self.dev_is_health():
return b""
assert self._ser
return self._ser.read(size=1024)
def _start_impl(self) -> None:
"""Start serial communication implementation."""
timeout = 0
path = self._conf.exec_path
args = self._conf.exec_args
logger.info(f"serial path: {path}")
logger.info(f"serial args: {args}")
if args: # pragma: no cover
args = self._decode_exec_args(args)
self._ser = serial.Serial(path, timeout=timeout, **args)
else:
self._ser = serial.Serial(path, timeout=timeout)
# reboot device if possible
self.reboot()
ret = self._wait_for_boot()
if ret is False:
raise TimeoutError("device boot timeout")
@property
def name(self) -> str:
"""Get device name."""
return "serial"
@property
def notalive(self) -> bool:
"""Check if the device is dead."""
if not self._ser:
return True
return False
def _stop_impl(self) -> None:
"""Stop serial device and close the port."""
if self._ser: # pragma: no cover
self._ser.close()
self._ser = None
logger.info("serial device closed")
def _poweroff_impl(self) -> bool:
"""Hardware poweroff: run the system poweroff command from config.
:return: (bool) True on success, False if no poweroff command
configured.
"""
if self._conf.poweroff: # pragma: no cover
logger.info("poweroff core")
self._system_cmd(self._conf.poweroff)
self.clear_fault_flags()
return True
return False
def _reboot_impl(self, timeout: int = 1) -> bool:
"""Hardware reboot: run the system reboot command from config.
:param timeout: (int) Unused for serial devices.
:return: (bool) True on success, False if no reboot command
configured.
"""
if self._conf.reboot: # pragma: no cover
logger.info("reboot core")
self._system_cmd(self._conf.reboot)
self.clear_fault_flags()
return True
return False