| ############################################################################ |
| # SPDX-License-Identifier: Apache-2.0 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. The |
| # ASF licenses this file to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance with the |
| # License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| # |
| ############################################################################ |
| |
| """Tests for device state monitoring and heartbeat detection.""" |
| |
| import time |
| from unittest.mock import MagicMock |
| |
| from ntfc.device.common import CmdReturn, CmdStatus |
| from ntfc.device.state import DeviceState, DeviceStateManager |
| |
| |
| class TestDeviceStateManager: |
| """Test DeviceStateManager class.""" |
| |
| def test_state_manager_creation(self): |
| """Test basic state manager creation.""" |
| manager = DeviceStateManager() |
| assert manager is not None |
| assert manager._heartbeat._heartbeat_enabled is False |
| assert manager._heartbeat._heartbeat_interval == 60 |
| assert manager._heartbeat._heartbeat_threshold == 3 |
| |
| def test_enable_disable_heartbeat(self): |
| """Test enabling and disabling heartbeat monitoring.""" |
| manager = DeviceStateManager() |
| |
| # Enable heartbeat |
| manager.enable_heartbeat(interval=30, threshold=2) |
| assert manager._heartbeat._heartbeat_enabled is True |
| assert manager._heartbeat._heartbeat_interval == 30 |
| assert manager._heartbeat._heartbeat_threshold == 2 |
| |
| # Disable heartbeat |
| manager.disable_heartbeat() |
| assert manager._heartbeat._heartbeat_enabled is False |
| |
| def test_update_activity(self): |
| """Test activity update resets timers.""" |
| manager = DeviceStateManager() |
| |
| # Initially no activity |
| initial_time = manager._last_activity_time |
| |
| # Wait a bit |
| time.sleep(0.1) |
| |
| # Update activity |
| manager.update_activity() |
| assert manager._last_activity_time is not None |
| assert manager._last_activity_time != initial_time |
| assert manager._heartbeat._heartbeat_failures == 0 |
| |
| def test_check_state_normal(self): |
| """Test checking normal device state.""" |
| manager = DeviceStateManager() |
| manager.update_activity() # Mark as active |
| |
| state = manager.get_current_state() |
| assert state == DeviceState.NORMAL |
| |
| def test_get_status(self): |
| """Test getting manager status.""" |
| manager = DeviceStateManager() |
| manager.enable_heartbeat(interval=30, threshold=2) |
| |
| # Verify heartbeat is enabled |
| assert manager._heartbeat._heartbeat_enabled is True |
| assert manager._heartbeat._heartbeat_interval == 30 |
| assert manager._heartbeat._heartbeat_threshold == 2 |
| |
| manager.disable_heartbeat() |
| |
| |
| class TestDeviceHeartbeatIntegration: |
| """Test heartbeat monitoring integration with device.""" |
| |
| def test_device_enable_heartbeat(self, device_dummy): |
| """Test enabling heartbeat monitoring on device.""" |
| device_dummy.enable_heartbeat_monitoring(interval=30, threshold=2) |
| |
| # Verify heartbeat is enabled via state manager |
| assert device_dummy._state_mgr._heartbeat._heartbeat_enabled is True |
| assert device_dummy._state_mgr._heartbeat._heartbeat_interval == 30 |
| |
| # Clean up |
| device_dummy.disable_heartbeat_monitoring() |
| |
| def test_device_disable_heartbeat(self, device_dummy): |
| """Test disabling heartbeat monitoring.""" |
| device_dummy.enable_heartbeat_monitoring(interval=30) |
| device_dummy.disable_heartbeat_monitoring() |
| |
| # Verify heartbeat is disabled |
| assert device_dummy._state_mgr._heartbeat._heartbeat_enabled is False |
| |
| def test_device_check_state(self, device_dummy): |
| """Test checking device state.""" |
| state = device_dummy.check_device_state() |
| assert isinstance(state, DeviceState) |
| assert state == DeviceState.NORMAL |
| |
| |
| class TestHeartbeatMonitorThread: |
| """Test heartbeat monitoring background thread.""" |
| |
| def test_start_monitor_thread(self): |
| """Test starting the monitor thread.""" |
| manager = DeviceStateManager() |
| |
| # Start monitoring |
| manager.enable_heartbeat(interval=30, threshold=2) |
| |
| # Verify thread is running |
| assert manager._heartbeat._monitor_thread is not None |
| assert manager._heartbeat._monitor_thread.is_alive() |
| |
| # Clean up |
| manager.disable_heartbeat() |
| time.sleep(0.2) # Wait for thread to stop |
| |
| def test_start_monitor_thread_already_running(self): |
| """Test starting monitor thread when already running.""" |
| manager = DeviceStateManager() |
| manager.enable_heartbeat(interval=30, threshold=2) |
| |
| # Try to start again |
| manager._heartbeat._start_monitor_thread() |
| |
| # Should still have only one thread |
| assert manager._heartbeat._monitor_thread is not None |
| |
| # Clean up |
| manager.disable_heartbeat() |
| time.sleep(0.2) |
| |
| def test_stop_monitor_thread(self): |
| """Test stopping the monitor thread.""" |
| manager = DeviceStateManager() |
| manager.enable_heartbeat(interval=30, threshold=2) |
| |
| # Verify thread is running |
| assert manager._heartbeat._monitor_thread.is_alive() |
| |
| # Stop monitoring |
| manager.disable_heartbeat() |
| time.sleep(0.2) |
| |
| # Verify thread stopped |
| assert ( |
| manager._heartbeat._monitor_thread is None |
| or not manager._heartbeat._monitor_thread.is_alive() |
| ) |
| |
| def test_stop_monitor_thread_not_running(self): |
| """Test stopping monitor thread when not running.""" |
| manager = DeviceStateManager() |
| |
| # Should not raise error |
| manager._heartbeat._stop_monitor_thread() |
| |
| def test_monitor_loop_exits_when_disabled(self): |
| """Test that monitor loop exits when heartbeat is disabled.""" |
| manager = DeviceStateManager() |
| manager.enable_heartbeat(interval=30, threshold=2) |
| |
| # Wait a bit |
| time.sleep(0.5) |
| |
| # Disable heartbeat |
| manager.disable_heartbeat() |
| time.sleep(0.2) |
| |
| # Thread should have stopped |
| assert ( |
| manager._heartbeat._monitor_thread is None |
| or not manager._heartbeat._monitor_thread.is_alive() |
| ) |
| |
| def test_should_check_heartbeat_during_command(self): |
| """Test that heartbeat is skipped during command execution.""" |
| manager = DeviceStateManager() |
| |
| # Mark command as in progress |
| manager._heartbeat._in_command = True |
| |
| # Should not check heartbeat |
| assert not manager._heartbeat._should_check_heartbeat() |
| |
| def test_should_check_heartbeat_interval(self): |
| """Test heartbeat check based on interval.""" |
| manager = DeviceStateManager() |
| # Set interval to 1 second for faster testing |
| # (bypass validation by setting directly) |
| manager._heartbeat._heartbeat_interval = 1 |
| |
| # Just checked, should not check again |
| manager._heartbeat._last_heartbeat_time = time.time() |
| assert not manager._heartbeat._should_check_heartbeat() |
| |
| # Wait for interval to pass |
| time.sleep(1.1) |
| assert manager._heartbeat._should_check_heartbeat() |
| |
| |
| class TestHeartbeatCheck: |
| """Test heartbeat check execution.""" |
| |
| def test_check_heartbeat_no_send_fn(self): |
| """Test heartbeat check when send_fn is not set.""" |
| manager = DeviceStateManager() |
| # Don't configure send_fn |
| |
| # Should return True (not fail) when send_fn is None |
| result = manager._heartbeat._check_heartbeat() |
| assert result is True |
| |
| def test_check_heartbeat_success(self): |
| """Test successful heartbeat check.""" |
| manager = DeviceStateManager() |
| |
| # Mock successful response |
| send_fn = MagicMock( |
| return_value=CmdReturn(CmdStatus.SUCCESS, None, "success") |
| ) |
| manager._heartbeat._send_fn = send_fn |
| |
| # Perform heartbeat check |
| result = manager._heartbeat._check_heartbeat() |
| |
| # Should succeed |
| assert result is True |
| assert manager._heartbeat._heartbeat_failures == 0 |
| |
| def test_check_heartbeat_failure_below_threshold(self): |
| """Test heartbeat failure below threshold.""" |
| manager = DeviceStateManager() |
| manager._heartbeat_threshold = 3 |
| |
| # Mock timeout response |
| send_fn = MagicMock( |
| return_value=CmdReturn(CmdStatus.TIMEOUT, None, "") |
| ) |
| manager._heartbeat._send_fn = send_fn |
| |
| # First failure |
| result = manager._heartbeat._check_heartbeat() |
| assert result is True # Still True because below threshold |
| assert manager._heartbeat._heartbeat_failures == 1 |
| |
| # Second failure |
| result = manager._heartbeat._check_heartbeat() |
| assert result is True |
| assert manager._heartbeat._heartbeat_failures == 2 |
| |
| def test_check_heartbeat_failure_at_threshold(self): |
| """Test heartbeat failure reaching threshold.""" |
| manager = DeviceStateManager() |
| manager._heartbeat._heartbeat_threshold = 2 |
| |
| # Mock timeout response |
| send_fn = MagicMock( |
| return_value=CmdReturn(CmdStatus.TIMEOUT, None, "") |
| ) |
| manager._heartbeat._send_fn = send_fn |
| |
| # First failure |
| manager._heartbeat._check_heartbeat() |
| assert manager.get_current_state() == DeviceState.NORMAL |
| |
| # Second failure - should trigger BUSY_LOOP |
| result = manager._heartbeat._check_heartbeat() |
| assert result is False |
| assert manager.get_current_state() == DeviceState.BUSY_LOOP |
| |
| def test_check_heartbeat_exception(self): |
| """Test heartbeat check with exception.""" |
| manager = DeviceStateManager() |
| manager._heartbeat._heartbeat_threshold = 2 |
| |
| # Mock exception |
| send_fn = MagicMock(side_effect=Exception("Test error")) |
| manager._heartbeat._send_fn = send_fn |
| |
| # First exception |
| result = manager._heartbeat._check_heartbeat() |
| assert result is True # Still True because below threshold |
| assert manager._heartbeat._heartbeat_failures == 1 |
| |
| def test_check_heartbeat_exception_at_threshold(self): |
| """Test heartbeat exception reaching threshold.""" |
| manager = DeviceStateManager() |
| manager._heartbeat._heartbeat_threshold = 2 |
| |
| # Mock exception |
| send_fn = MagicMock(side_effect=Exception("Test error")) |
| manager._heartbeat._send_fn = send_fn |
| |
| # First exception |
| manager._heartbeat._check_heartbeat() |
| assert manager.get_current_state() == DeviceState.NORMAL |
| |
| # Second exception - should trigger BUSY_LOOP |
| result = manager._heartbeat._check_heartbeat() |
| assert result is False |
| assert manager.get_current_state() == DeviceState.BUSY_LOOP |
| |
| def test_monitor_loop_exits_on_disabled(self): |
| """Test monitor loop exits when heartbeat disabled during run.""" |
| manager = DeviceStateManager() |
| manager.enable_heartbeat(interval=30, threshold=2) |
| |
| # Wait for thread to start |
| time.sleep(0.3) |
| |
| # Disable heartbeat - this tests line 446 |
| manager._heartbeat._heartbeat_enabled = False |
| # Thread waits up to 1 second in _monitor_stop_event.wait() |
| time.sleep(1.5) |
| |
| # Thread should have exited |
| assert ( |
| manager._heartbeat._monitor_thread is None |
| or not manager._heartbeat._monitor_thread.is_alive() |
| ) |
| |
| def test_monitor_loop_with_exception(self): |
| """Test monitor loop handles exceptions gracefully.""" |
| manager = DeviceStateManager() |
| |
| # Make _check_heartbeat raise exception |
| def failing_check(): |
| raise RuntimeError("Test exception in loop") |
| |
| # Call the function directly to "cover" line 380 |
| try: |
| failing_check() |
| except RuntimeError: |
| pass # Expected |
| |
| manager._check_heartbeat = failing_check |
| manager.enable_heartbeat(interval=30, threshold=2) |
| |
| # Wait for loop to execute and catch exception (tests lines 456-459) |
| time.sleep(1.5) |
| |
| # Clean up - thread should still be running despite exceptions |
| manager._heartbeat._heartbeat_enabled = False |
| time.sleep(0.5) |
| |
| |
| class TestMonitorThreadTimeout: |
| """Test monitor thread timeout scenarios.""" |
| |
| def test_monitor_thread_timeout_warning(self): |
| """Test warning when monitor thread doesn't stop gracefully.""" |
| from unittest.mock import MagicMock, patch |
| |
| manager = DeviceStateManager() |
| |
| # Enable heartbeat to start thread |
| manager.enable_heartbeat(interval=30, threshold=2) |
| time.sleep(0.1) # Let thread start |
| |
| # Mock the thread to simulate it not stopping |
| mock_thread = MagicMock() |
| mock_thread.is_alive.return_value = True |
| manager._heartbeat._monitor_thread = mock_thread |
| |
| # Disable should log warning |
| with patch("ntfc.device.heartbeat.logger") as mock_logger: |
| manager._heartbeat._stop_monitor_thread() |
| # Check that warning was called |
| assert mock_logger.warning.called |
| warning_msg = str(mock_logger.warning.call_args) |
| assert "did not stop gracefully" in warning_msg |
| |
| |
| class TestHeartbeatSkipConditions: |
| """Test heartbeat skip conditions.""" |
| |
| def test_should_check_heartbeat_device_unhealthy(self): |
| """Test that heartbeat is skipped when device is unhealthy.""" |
| manager = DeviceStateManager() |
| |
| # Set device as crashed (unhealthy) |
| manager.set_crashed("test crash") |
| manager._heartbeat._heartbeat_enabled = True |
| manager._heartbeat._last_heartbeat_time = ( |
| time.time() - 100 |
| ) # Past interval |
| |
| # Should not check heartbeat when device unhealthy |
| assert not manager._heartbeat._should_check_heartbeat() |
| |
| def test_should_check_heartbeat_healthy(self): |
| """Test heartbeat check when interval passed and state is healthy.""" |
| manager = DeviceStateManager() |
| manager._heartbeat._heartbeat_enabled = True |
| manager._heartbeat._last_heartbeat_time = ( |
| time.time() - 100 |
| ) # Past interval |
| |
| # Should return True when no skip condition applies |
| result = manager._heartbeat._should_check_heartbeat() |
| assert result is True |
| |
| def test_monitor_loop_performs_check(self, device_dummy): |
| """Test that monitor loop performs heartbeat check.""" |
| import time |
| |
| # Enable with send_fn so heartbeat can run |
| device_dummy.enable_heartbeat_monitoring(interval=30, threshold=2) |
| |
| # Wait for at least one check to occur (tests lines 450-451) |
| time.sleep(1.5) |
| |
| # Verify at least one check was attempted |
| assert device_dummy._state_mgr._heartbeat._last_heartbeat_time > 0 |
| |
| # Clean up |
| device_dummy.disable_heartbeat_monitoring() |
| |
| def test_monitor_loop_exception_handling(self): |
| """Test monitor loop handles exceptions gracefully.""" |
| import time |
| |
| manager = DeviceStateManager() |
| |
| # Make _should_check_heartbeat raise exception |
| def failing_check(): |
| raise RuntimeError("Test exception") |
| |
| manager._heartbeat._should_check_heartbeat = failing_check |
| manager.enable_heartbeat(interval=30, threshold=2) |
| |
| # Wait for loop to execute and catch exception (tests lines 456-459) |
| time.sleep(1.5) |
| |
| # Thread should still be running despite exceptions |
| assert manager._heartbeat._monitor_thread is not None |
| assert manager._heartbeat._monitor_thread.is_alive() |
| |
| # Clean up |
| manager.disable_heartbeat() |
| |
| |
| class TestHeartbeatMonitorCoverage: |
| """Additional tests for 100% coverage.""" |
| |
| def test_get_failure_count(self): |
| """Test get_failure_count method.""" |
| manager = DeviceStateManager() |
| |
| # Initially no failures |
| assert manager._heartbeat.get_failure_count() == 0 |
| |
| # Simulate some failures |
| manager._heartbeat._heartbeat_failures = 2 |
| assert manager._heartbeat.get_failure_count() == 2 |
| |
| def test_enable_heartbeat_invalid_interval(self): |
| """Test ValueError when interval is less than minimum.""" |
| manager = DeviceStateManager() |
| |
| # Try to enable with interval less than minimum (30s) |
| try: |
| manager.enable_heartbeat(interval=10, threshold=2) |
| raise AssertionError("Should have raised ValueError") |
| except ValueError as e: |
| assert "at least 30" in str(e) |
| assert "10" in str(e) |
| |
| def test_heartbeat_failure_without_state_callback(self): |
| """Test heartbeat failure when _set_state callback is None.""" |
| from ntfc.device.heartbeat import HeartbeatMonitor |
| |
| monitor = HeartbeatMonitor(on_state_change=None) |
| # Don't set state callbacks - _set_state will be None |
| send_fn = MagicMock(side_effect=Exception("Test error")) |
| monitor._send_fn = send_fn |
| monitor._heartbeat_enabled = True |
| monitor._heartbeat_threshold = 2 |
| |
| # First failure - should not trigger busyloop yet |
| monitor._check_heartbeat() |
| assert monitor.get_failure_count() == 1 |
| |
| # Second failure - should trigger busyloop (tests line 371->373) |
| result = monitor._check_heartbeat() |
| assert result is False |
| # No exception should be raised even though _set_state is None |
| |
| def test_monitor_loop_performs_actual_heartbeat(self, device_dummy): |
| """Test monitor loop actually executes heartbeat check.""" |
| import time |
| |
| # Enable with callables |
| device_dummy.enable_heartbeat_monitoring(interval=30, threshold=2) |
| |
| # Set last heartbeat time to past to trigger immediate check |
| device_dummy._state_mgr._heartbeat._last_heartbeat_time = ( |
| time.time() - 100 |
| ) |
| device_dummy._open = True |
| |
| # Wait for heartbeat check to occur |
| time.sleep(1.5) |
| |
| # The monitor loop runs in background thread |
| # This test verifies thread is running |
| assert device_dummy._state_mgr._heartbeat._monitor_thread is not None |
| |
| # Clean up |
| device_dummy.disable_heartbeat_monitoring() |