from __future__ import annotations import threading import time import unittest from app.config.models import InfinityMirrorConfig from app.core.types import PreviewFrame, RGBColor from app.output.base import OutputBackend, OutputResult from app.output.manager import OutputManager def _frame(pattern_id: str) -> PreviewFrame: black = RGBColor.black() return PreviewFrame( timestamp=0.0, pattern_id=pattern_id, utility_mode="none", background_start=black, background_end=black, tiles={}, ) class RecordingBackend(OutputBackend): backend_id = "recording" display_name = "Recording" supports_live_output = True def __init__(self, result: OutputResult | None = None, send_gate: threading.Event | None = None) -> None: self.result = result if result is not None else OutputResult(ok=True, message="", packets_sent=1, device_count=1) self.send_gate = send_gate self.sent_patterns: list[str] = [] self.send_event = threading.Event() self.start_count = 0 self.stop_count = 0 self._lock = threading.Lock() def start(self) -> None: with self._lock: self.start_count += 1 def stop(self) -> None: with self._lock: self.stop_count += 1 def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult: if self.send_gate is not None: self.send_gate.wait(0.5) with self._lock: self.sent_patterns.append(frame.pattern_id) self.send_event.set() return self.result class TimingBackend(OutputBackend): backend_id = "timing" display_name = "Timing" supports_live_output = True def __init__(self, delay_s: float) -> None: self.delay_s = delay_s self.sent_at: list[float] = [] self.send_event = threading.Event() def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult: time.sleep(self.delay_s) self.sent_at.append(time.perf_counter()) self.send_event.set() return OutputResult(ok=True, message="", packets_sent=1, device_count=1) class OutputManagerTests(unittest.TestCase): def tearDown(self) -> None: manager = getattr(self, "manager", None) if manager is not None: manager.shutdown() def test_latest_frame_overwrites_stale_pending_frame(self) -> None: self.manager = OutputManager(target_fps=20.0) backend = RecordingBackend() self.manager.backends[backend.backend_id] = backend self.manager.set_active_backend(backend.backend_id) self.manager.set_output_enabled(True) self.manager.submit_frame(_frame("solid")) self.manager.submit_frame(_frame("sparkle")) self.manager.update_config(InfinityMirrorConfig()) self.assertTrue(backend.send_event.wait(0.5)) diagnostics = self.manager.diagnostics_snapshot() self.assertEqual(backend.sent_patterns[0], "sparkle") self.assertEqual(diagnostics.stale_frame_drops, 1) def test_diagnostics_report_selected_backend_and_packet_counts(self) -> None: self.manager = OutputManager(target_fps=25.0) backend = RecordingBackend(result=OutputResult(ok=True, message="ok", packets_sent=3, device_count=2)) self.manager.backends[backend.backend_id] = backend self.manager.set_active_backend(backend.backend_id) self.manager.update_config(InfinityMirrorConfig()) self.manager.set_output_enabled(True) self.manager.submit_frame(_frame("solid")) self.assertTrue(backend.send_event.wait(0.5)) diagnostics = self.manager.diagnostics_snapshot() self.assertEqual(diagnostics.backend_id, backend.backend_id) self.assertEqual(diagnostics.backend_name, backend.display_name) self.assertTrue(diagnostics.worker_running) self.assertEqual(diagnostics.packets_last_frame, 3) self.assertEqual(diagnostics.devices_last_frame, 2) self.assertGreaterEqual(diagnostics.frames_sent, 1) def test_send_failures_are_exposed_as_status_messages(self) -> None: self.manager = OutputManager(target_fps=25.0) backend = RecordingBackend(result=OutputResult(ok=False, message="send failed", packets_sent=0, device_count=0)) self.manager.backends[backend.backend_id] = backend self.manager.set_active_backend(backend.backend_id) self.manager.update_config(InfinityMirrorConfig()) self.manager.set_output_enabled(True) self.manager.submit_frame(_frame("solid")) self.assertTrue(backend.send_event.wait(0.5)) self.assertIn("send failed", self.manager.drain_status_messages()) self.assertEqual(self.manager.diagnostics_snapshot().send_failures, 1) def test_worker_keeps_target_cadence_instead_of_adding_send_time_to_every_interval(self) -> None: self.manager = OutputManager(target_fps=25.0) backend = TimingBackend(delay_s=0.012) self.manager.backends[backend.backend_id] = backend self.manager.set_active_backend(backend.backend_id) self.manager.update_config(InfinityMirrorConfig()) self.manager.set_output_enabled(True) started_at = time.perf_counter() while time.perf_counter() - started_at < 0.75: self.manager.submit_frame(_frame("solid")) time.sleep(0.01) self.assertTrue(backend.send_event.wait(0.5)) time.sleep(0.2) intervals = [later - earlier for earlier, later in zip(backend.sent_at, backend.sent_at[1:])] self.assertGreaterEqual(len(intervals), 8) average_interval = sum(intervals) / len(intervals) self.assertLess(average_interval, 0.05) diagnostics = self.manager.diagnostics_snapshot() self.assertLessEqual(diagnostics.send_budget_misses, 1) if __name__ == "__main__": unittest.main()