157 lines
5.8 KiB
Python
157 lines
5.8 KiB
Python
from __future__ import annotations
|
|
|
|
import threading
|
|
import time
|
|
import unittest
|
|
|
|
from app.config.models import InfinityMirrorConfig
|
|
from app.core.types import PreviewFrame, RGBColor
|
|
from app.output.base import OutputBackend, OutputResult
|
|
from app.output.manager import OutputManager
|
|
|
|
|
|
def _frame(pattern_id: str) -> PreviewFrame:
|
|
black = RGBColor.black()
|
|
return PreviewFrame(
|
|
timestamp=0.0,
|
|
pattern_id=pattern_id,
|
|
utility_mode="none",
|
|
background_start=black,
|
|
background_end=black,
|
|
tiles={},
|
|
)
|
|
|
|
|
|
class RecordingBackend(OutputBackend):
|
|
backend_id = "recording"
|
|
display_name = "Recording"
|
|
supports_live_output = True
|
|
|
|
def __init__(self, result: OutputResult | None = None, send_gate: threading.Event | None = None) -> None:
|
|
self.result = result if result is not None else OutputResult(ok=True, message="", packets_sent=1, device_count=1)
|
|
self.send_gate = send_gate
|
|
self.sent_patterns: list[str] = []
|
|
self.send_event = threading.Event()
|
|
self.start_count = 0
|
|
self.stop_count = 0
|
|
self._lock = threading.Lock()
|
|
|
|
def start(self) -> None:
|
|
with self._lock:
|
|
self.start_count += 1
|
|
|
|
def stop(self) -> None:
|
|
with self._lock:
|
|
self.stop_count += 1
|
|
|
|
def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult:
|
|
if self.send_gate is not None:
|
|
self.send_gate.wait(0.5)
|
|
with self._lock:
|
|
self.sent_patterns.append(frame.pattern_id)
|
|
self.send_event.set()
|
|
return self.result
|
|
|
|
|
|
class TimingBackend(OutputBackend):
|
|
backend_id = "timing"
|
|
display_name = "Timing"
|
|
supports_live_output = True
|
|
|
|
def __init__(self, delay_s: float) -> None:
|
|
self.delay_s = delay_s
|
|
self.sent_at: list[float] = []
|
|
self.send_event = threading.Event()
|
|
|
|
def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult:
|
|
time.sleep(self.delay_s)
|
|
self.sent_at.append(time.perf_counter())
|
|
self.send_event.set()
|
|
return OutputResult(ok=True, message="", packets_sent=1, device_count=1)
|
|
|
|
|
|
class OutputManagerTests(unittest.TestCase):
|
|
def tearDown(self) -> None:
|
|
manager = getattr(self, "manager", None)
|
|
if manager is not None:
|
|
manager.shutdown()
|
|
|
|
def test_latest_frame_overwrites_stale_pending_frame(self) -> None:
|
|
self.manager = OutputManager(target_fps=20.0)
|
|
backend = RecordingBackend()
|
|
self.manager.backends[backend.backend_id] = backend
|
|
self.manager.set_active_backend(backend.backend_id)
|
|
self.manager.set_output_enabled(True)
|
|
|
|
self.manager.submit_frame(_frame("solid"))
|
|
self.manager.submit_frame(_frame("sparkle"))
|
|
self.manager.update_config(InfinityMirrorConfig())
|
|
|
|
self.assertTrue(backend.send_event.wait(0.5))
|
|
diagnostics = self.manager.diagnostics_snapshot()
|
|
|
|
self.assertEqual(backend.sent_patterns[0], "sparkle")
|
|
self.assertEqual(diagnostics.stale_frame_drops, 1)
|
|
|
|
def test_diagnostics_report_selected_backend_and_packet_counts(self) -> None:
|
|
self.manager = OutputManager(target_fps=25.0)
|
|
backend = RecordingBackend(result=OutputResult(ok=True, message="ok", packets_sent=3, device_count=2))
|
|
self.manager.backends[backend.backend_id] = backend
|
|
self.manager.set_active_backend(backend.backend_id)
|
|
self.manager.update_config(InfinityMirrorConfig())
|
|
self.manager.set_output_enabled(True)
|
|
|
|
self.manager.submit_frame(_frame("solid"))
|
|
self.assertTrue(backend.send_event.wait(0.5))
|
|
|
|
diagnostics = self.manager.diagnostics_snapshot()
|
|
self.assertEqual(diagnostics.backend_id, backend.backend_id)
|
|
self.assertEqual(diagnostics.backend_name, backend.display_name)
|
|
self.assertTrue(diagnostics.worker_running)
|
|
self.assertEqual(diagnostics.packets_last_frame, 3)
|
|
self.assertEqual(diagnostics.devices_last_frame, 2)
|
|
self.assertGreaterEqual(diagnostics.frames_sent, 1)
|
|
|
|
def test_send_failures_are_exposed_as_status_messages(self) -> None:
|
|
self.manager = OutputManager(target_fps=25.0)
|
|
backend = RecordingBackend(result=OutputResult(ok=False, message="send failed", packets_sent=0, device_count=0))
|
|
self.manager.backends[backend.backend_id] = backend
|
|
self.manager.set_active_backend(backend.backend_id)
|
|
self.manager.update_config(InfinityMirrorConfig())
|
|
self.manager.set_output_enabled(True)
|
|
|
|
self.manager.submit_frame(_frame("solid"))
|
|
self.assertTrue(backend.send_event.wait(0.5))
|
|
|
|
self.assertIn("send failed", self.manager.drain_status_messages())
|
|
self.assertEqual(self.manager.diagnostics_snapshot().send_failures, 1)
|
|
|
|
def test_worker_keeps_target_cadence_instead_of_adding_send_time_to_every_interval(self) -> None:
|
|
self.manager = OutputManager(target_fps=25.0)
|
|
backend = TimingBackend(delay_s=0.012)
|
|
self.manager.backends[backend.backend_id] = backend
|
|
self.manager.set_active_backend(backend.backend_id)
|
|
self.manager.update_config(InfinityMirrorConfig())
|
|
self.manager.set_output_enabled(True)
|
|
|
|
started_at = time.perf_counter()
|
|
while time.perf_counter() - started_at < 0.75:
|
|
self.manager.submit_frame(_frame("solid"))
|
|
time.sleep(0.01)
|
|
|
|
self.assertTrue(backend.send_event.wait(0.5))
|
|
time.sleep(0.2)
|
|
|
|
intervals = [later - earlier for earlier, later in zip(backend.sent_at, backend.sent_at[1:])]
|
|
self.assertGreaterEqual(len(intervals), 8)
|
|
|
|
average_interval = sum(intervals) / len(intervals)
|
|
self.assertLess(average_interval, 0.05)
|
|
|
|
diagnostics = self.manager.diagnostics_snapshot()
|
|
self.assertLessEqual(diagnostics.send_budget_misses, 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|