Files
RFP_Infinity-Vis/tests/test_output_manager.py

157 lines
5.8 KiB
Python

from __future__ import annotations
import threading
import time
import unittest
from app.config.models import InfinityMirrorConfig
from app.core.types import PreviewFrame, RGBColor
from app.output.base import OutputBackend, OutputResult
from app.output.manager import OutputManager
def _frame(pattern_id: str) -> PreviewFrame:
black = RGBColor.black()
return PreviewFrame(
timestamp=0.0,
pattern_id=pattern_id,
utility_mode="none",
background_start=black,
background_end=black,
tiles={},
)
class RecordingBackend(OutputBackend):
backend_id = "recording"
display_name = "Recording"
supports_live_output = True
def __init__(self, result: OutputResult | None = None, send_gate: threading.Event | None = None) -> None:
self.result = result if result is not None else OutputResult(ok=True, message="", packets_sent=1, device_count=1)
self.send_gate = send_gate
self.sent_patterns: list[str] = []
self.send_event = threading.Event()
self.start_count = 0
self.stop_count = 0
self._lock = threading.Lock()
def start(self) -> None:
with self._lock:
self.start_count += 1
def stop(self) -> None:
with self._lock:
self.stop_count += 1
def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult:
if self.send_gate is not None:
self.send_gate.wait(0.5)
with self._lock:
self.sent_patterns.append(frame.pattern_id)
self.send_event.set()
return self.result
class TimingBackend(OutputBackend):
backend_id = "timing"
display_name = "Timing"
supports_live_output = True
def __init__(self, delay_s: float) -> None:
self.delay_s = delay_s
self.sent_at: list[float] = []
self.send_event = threading.Event()
def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult:
time.sleep(self.delay_s)
self.sent_at.append(time.perf_counter())
self.send_event.set()
return OutputResult(ok=True, message="", packets_sent=1, device_count=1)
class OutputManagerTests(unittest.TestCase):
def tearDown(self) -> None:
manager = getattr(self, "manager", None)
if manager is not None:
manager.shutdown()
def test_latest_frame_overwrites_stale_pending_frame(self) -> None:
self.manager = OutputManager(target_fps=20.0)
backend = RecordingBackend()
self.manager.backends[backend.backend_id] = backend
self.manager.set_active_backend(backend.backend_id)
self.manager.set_output_enabled(True)
self.manager.submit_frame(_frame("solid"))
self.manager.submit_frame(_frame("sparkle"))
self.manager.update_config(InfinityMirrorConfig())
self.assertTrue(backend.send_event.wait(0.5))
diagnostics = self.manager.diagnostics_snapshot()
self.assertEqual(backend.sent_patterns[0], "sparkle")
self.assertEqual(diagnostics.stale_frame_drops, 1)
def test_diagnostics_report_selected_backend_and_packet_counts(self) -> None:
self.manager = OutputManager(target_fps=25.0)
backend = RecordingBackend(result=OutputResult(ok=True, message="ok", packets_sent=3, device_count=2))
self.manager.backends[backend.backend_id] = backend
self.manager.set_active_backend(backend.backend_id)
self.manager.update_config(InfinityMirrorConfig())
self.manager.set_output_enabled(True)
self.manager.submit_frame(_frame("solid"))
self.assertTrue(backend.send_event.wait(0.5))
diagnostics = self.manager.diagnostics_snapshot()
self.assertEqual(diagnostics.backend_id, backend.backend_id)
self.assertEqual(diagnostics.backend_name, backend.display_name)
self.assertTrue(diagnostics.worker_running)
self.assertEqual(diagnostics.packets_last_frame, 3)
self.assertEqual(diagnostics.devices_last_frame, 2)
self.assertGreaterEqual(diagnostics.frames_sent, 1)
def test_send_failures_are_exposed_as_status_messages(self) -> None:
self.manager = OutputManager(target_fps=25.0)
backend = RecordingBackend(result=OutputResult(ok=False, message="send failed", packets_sent=0, device_count=0))
self.manager.backends[backend.backend_id] = backend
self.manager.set_active_backend(backend.backend_id)
self.manager.update_config(InfinityMirrorConfig())
self.manager.set_output_enabled(True)
self.manager.submit_frame(_frame("solid"))
self.assertTrue(backend.send_event.wait(0.5))
self.assertIn("send failed", self.manager.drain_status_messages())
self.assertEqual(self.manager.diagnostics_snapshot().send_failures, 1)
def test_worker_keeps_target_cadence_instead_of_adding_send_time_to_every_interval(self) -> None:
self.manager = OutputManager(target_fps=25.0)
backend = TimingBackend(delay_s=0.012)
self.manager.backends[backend.backend_id] = backend
self.manager.set_active_backend(backend.backend_id)
self.manager.update_config(InfinityMirrorConfig())
self.manager.set_output_enabled(True)
started_at = time.perf_counter()
while time.perf_counter() - started_at < 0.75:
self.manager.submit_frame(_frame("solid"))
time.sleep(0.01)
self.assertTrue(backend.send_event.wait(0.5))
time.sleep(0.2)
intervals = [later - earlier for earlier, later in zip(backend.sent_at, backend.sent_at[1:])]
self.assertGreaterEqual(len(intervals), 8)
average_interval = sum(intervals) / len(intervals)
self.assertLess(average_interval, 0.05)
diagnostics = self.manager.diagnostics_snapshot()
self.assertLessEqual(diagnostics.send_budget_misses, 1)
if __name__ == "__main__":
unittest.main()