blob: 003ed6d61bd257d6c272fa8511e41db34b37d0ba [file]
############################################################################
# SPDX-License-Identifier: Apache-2.0
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
# ASF licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
############################################################################
import os
import pty
import select
import threading
import pytest
import serial
from ntfc.coreconfig import CoreConfig
from ntfc.device.serial import DeviceSerial
@pytest.fixture
def serial_pair():
fd1, fd2 = pty.openpty()
fd2_name = os.ttyname(fd2)
return fd1, fd2_name
g_fake_device_retval = b""
def fake_device_thread(fd, stop):
# read garbage
# data = os.read(fd, 100)
while not stop.is_set():
try:
rlist, _, _ = select.select([fd], [], [], 0.2)
if rlist:
_ = os.read(fd, 100)
os.write(fd, g_fake_device_retval)
except OSError: # pragma: no cover
break
@pytest.fixture
def serial_config():
config = {
"name": "main",
"device": "serial",
"exec_path": "",
"exec_args": "",
"conf_path": "",
"elf_path": "",
}
return CoreConfig(config)
def test_device_sim_internals(serial_config, serial_pair):
ser = DeviceSerial(serial_config)
assert ser.name == "serial"
assert ser.notalive is True
# stop on non-started device is a no-op
ser.stop()
with pytest.raises(ValueError):
_ = ser._decode_exec_args("aaaa")
assert ser._decode_exec_args("1,n,8,1.5") == {
"baudrate": 1,
"bytesize": 8,
"parity": "N",
"stopbits": 1.5,
}
assert ser._dev_is_health_priv() is False
assert ser._read() == b""
assert ser._write_ctrl("a") is None
assert ser._write(b"a") is None
def test_device_sim_init(serial_config, serial_pair):
ser = DeviceSerial(serial_config)
# no path and args
with pytest.raises(serial.serialutil.SerialException):
serial_config._config["exec_path"] = ""
serial_config._config["exec_args"] = ""
ser.start()
# no boot
with pytest.raises(TimeoutError):
serial_config._config["exec_path"] = serial_pair[1]
serial_config._config["exec_args"] = ""
ser.start()
stop = threading.Event()
device_thread = threading.Thread(
target=fake_device_thread, args=(serial_pair[0], stop)
)
device_thread.start()
global g_fake_device_retval
g_fake_device_retval = b"\n\rnsh>"
ser.start()
assert ser.notalive is False
g_fake_device_retval = b""
assert ser._write(b"a") is None
assert ser._write_ctrl("a") is None
# hard reboot/poweroff: no system command configured -> False
assert ser.reboot() is False
assert ser.poweroff() is False
# soft reboot: device must respond with prompt for boot wait to succeed
g_fake_device_retval = b"\n\rnsh>"
assert ser.reboot(hard=False) is True
# soft poweroff: sends OS command, no boot wait required
assert ser.poweroff(hard=False) is True
stop.set()
device_thread.join(timeout=1)