blob: a31d25a121e332f70245940a0f1da790937b4aac [file]
############################################################################
# SPDX-License-Identifier: Apache-2.0
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
# ASF licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
############################################################################
from unittest.mock import patch
import pytest
from ntfc.device.state import CrashType, DeviceState, DeviceStateManager
class TestCrashType:
def test_all_crash_types_exist(self):
types = {t.name for t in CrashType}
assert types == {"UNKNOWN", "ASSERTION", "SEGFAULT", "PANIC"}
def test_types_are_unique(self):
values = [t.value for t in CrashType]
assert len(values) == len(set(values))
class TestDeviceState:
def test_all_states_exist(self):
states = {s.name for s in DeviceState}
assert states == {
"NORMAL",
"BUSY_LOOP",
"CRASHED",
"UNHEALTHY",
}
def test_states_are_unique(self):
values = [s.value for s in DeviceState]
assert len(values) == len(set(values))
class TestDeviceStateManagerInit:
def test_starts_normal(self):
mgr = DeviceStateManager()
assert mgr.get_current_state() is DeviceState.NORMAL
assert mgr.is_healthy() is True
assert mgr.is_crashed() is False
assert mgr.is_busy_loop() is False
assert mgr.is_unhealthy() is False
def test_initial_crash_type_unknown(self):
mgr = DeviceStateManager()
assert mgr.get_crash_type() is CrashType.UNKNOWN
class TestDeviceStateManagerTransitions:
def test_set_crashed(self):
mgr = DeviceStateManager()
mgr.set_crashed("panic")
assert mgr.is_crashed() is True
assert mgr.is_unhealthy() is True
assert mgr.is_healthy() is False
def test_set_busy_loop(self):
mgr = DeviceStateManager()
mgr.set_busy_loop("no output")
assert mgr.is_busy_loop() is True
assert mgr.is_unhealthy() is True
def test_set_unhealthy(self):
mgr = DeviceStateManager()
mgr.set_unhealthy("flood")
assert mgr.is_unhealthy() is True
assert mgr.is_crashed() is False
assert mgr.is_busy_loop() is False
def test_set_normal(self):
mgr = DeviceStateManager()
mgr.set_crashed("x")
mgr.set_normal("recovered")
assert mgr.is_healthy() is True
assert mgr.is_unhealthy() is False
def test_set_crashed_with_crash_type(self):
mgr = DeviceStateManager()
mgr.set_crashed("oops", crash_type=CrashType.ASSERTION)
assert mgr.get_crash_type() is CrashType.ASSERTION
def test_set_crashed_segfault_type(self):
mgr = DeviceStateManager()
mgr.set_crashed("seg", crash_type=CrashType.SEGFAULT)
assert mgr.get_crash_type() is CrashType.SEGFAULT
def test_set_crashed_panic_type(self):
mgr = DeviceStateManager()
mgr.set_crashed("panic", crash_type=CrashType.PANIC)
assert mgr.get_crash_type() is CrashType.PANIC
def test_set_crashed_default_crash_type_unknown(self):
mgr = DeviceStateManager()
mgr.set_crashed("x")
assert mgr.get_crash_type() is CrashType.UNKNOWN
def test_reset_all_states(self):
mgr = DeviceStateManager()
mgr.set_crashed("x", crash_type=CrashType.PANIC)
mgr.reset_all_states()
assert mgr.is_healthy() is True
assert mgr.is_crashed() is False
assert mgr.get_crash_type() is CrashType.UNKNOWN
class TestDeviceStateManagerIsUnhealthy:
@pytest.mark.parametrize(
"setter,expected",
[
("set_crashed", True),
("set_busy_loop", True),
("set_unhealthy", True),
("set_normal", False),
],
)
def test_is_unhealthy_per_state(self, setter, expected):
mgr = DeviceStateManager()
getattr(mgr, setter)("reason")
assert mgr.is_unhealthy() is expected
class TestDeviceStateManagerActivity:
def test_no_activity_returns_false(self):
mgr = DeviceStateManager(busyloop_threshold=1.0)
assert mgr.check_busy_loop_timeout() is False
def test_recent_activity_returns_false(self):
mgr = DeviceStateManager(busyloop_threshold=60.0)
mgr.update_activity()
assert mgr.check_busy_loop_timeout() is False
def test_exceeded_threshold_returns_true(self):
mgr = DeviceStateManager(busyloop_threshold=60.0)
mgr.update_activity()
base = mgr._last_activity_time
with patch("ntfc.device.state.time") as mock_time:
mock_time.time.return_value = base + 61.0
result = mgr.check_busy_loop_timeout()
assert result is True
assert mgr.is_busy_loop() is True
def test_reset_clears_activity_time(self):
mgr = DeviceStateManager(busyloop_threshold=1.0)
mgr.update_activity()
assert mgr._last_activity_time is not None
mgr.reset_all_states()
assert mgr._last_activity_time is None
_SIGNATURES: dict = {
CrashType.ASSERTION: [b"Assertion failed"],
CrashType.SEGFAULT: [b"up_dump_register"],
CrashType.PANIC: [b"dump_tasks"],
}
class TestDeviceStateManagerCrashDetection:
def test_detect_crash_type_assertion(self):
mgr = DeviceStateManager(crash_signatures=_SIGNATURES)
result = mgr._detect_crash_type(b"Assertion failed at file.c:42")
assert result is CrashType.ASSERTION
def test_detect_crash_type_segfault(self):
mgr = DeviceStateManager(crash_signatures=_SIGNATURES)
result = mgr._detect_crash_type(b"up_dump_register: EAX=0x0")
assert result is CrashType.SEGFAULT
def test_detect_crash_type_panic(self):
mgr = DeviceStateManager(crash_signatures=_SIGNATURES)
result = mgr._detect_crash_type(b"dump_tasks: PID NAME")
assert result is CrashType.PANIC
def test_detect_crash_type_none(self):
mgr = DeviceStateManager(crash_signatures=_SIGNATURES)
result = mgr._detect_crash_type(b"normal output, no crash here")
assert result is None
def test_detect_crash_type_empty_output(self):
mgr = DeviceStateManager(crash_signatures=_SIGNATURES)
assert mgr._detect_crash_type(b"") is None
def test_detect_crash_type_no_signatures(self):
mgr = DeviceStateManager()
assert mgr._detect_crash_type(b"Assertion failed") is None
def test_check_crash_assertion(self):
mgr = DeviceStateManager(crash_signatures=_SIGNATURES)
result = mgr.check_crash(b"Assertion failed at file.c:10")
assert result is True
assert mgr.is_crashed() is True
assert mgr.get_crash_type() is CrashType.ASSERTION
def test_check_crash_no_match(self):
mgr = DeviceStateManager(crash_signatures=_SIGNATURES)
result = mgr.check_crash(b"all good, running normally")
assert result is False
assert mgr.is_crashed() is False
def test_check_crash_no_signatures(self):
mgr = DeviceStateManager()
result = mgr.check_crash(b"Assertion failed")
assert result is False
def test_check_crash_duplicate_skips_callback(self):
calls: list = []
mgr = DeviceStateManager(
crash_signatures=_SIGNATURES,
on_state_change=lambda old, new, reason: calls.append(
(old, new, reason)
),
)
mgr.check_crash(b"Assertion failed at a.c:1")
mgr.check_crash(b"Assertion failed at b.c:2")
assert len(calls) == 1
class TestDeviceStateManagerCallback:
def test_callback_called_on_transition(self):
calls = []
def cb(old, new, reason):
calls.append((old, new))
mgr = DeviceStateManager(on_state_change=cb)
mgr.set_crashed("oops")
assert calls == [(DeviceState.NORMAL, DeviceState.CRASHED)]
def test_callback_not_called_on_duplicate(self):
calls = []
def cb(old, new, reason):
calls.append((old, new))
mgr = DeviceStateManager(on_state_change=cb)
mgr.set_crashed("first")
mgr.set_crashed("second") # duplicate, skipped
assert len(calls) == 1
def test_callback_exception_does_not_propagate(self):
def bad_cb(old, new, reason):
raise RuntimeError("boom")
mgr = DeviceStateManager(on_state_change=bad_cb)
mgr.set_crashed("x") # must not raise
assert mgr.is_crashed() is True
def test_no_callback_no_error(self):
mgr = DeviceStateManager(on_state_change=None)
mgr.set_crashed("x") # must not raise
assert mgr.is_crashed() is True