from __future__ import annotations from collections import deque import threading import time from app.config.models import InfinityMirrorConfig from app.core.types import PreviewFrame from .artnet import ArtnetOutputBackend from .base import ControllerMetrics, OutputBackend, OutputDiagnostics, OutputResult from .ddp import DdpOutputBackend from .preview import PreviewOutputBackend DEFAULT_OUTPUT_FPS = 40.0 MIN_OUTPUT_FPS = 1.0 MAX_OUTPUT_FPS = 60.0 class OutputManager: def __init__(self, target_fps: float = DEFAULT_OUTPUT_FPS) -> None: self.backends: dict[str, OutputBackend] = { PreviewOutputBackend.backend_id: PreviewOutputBackend(), DdpOutputBackend.backend_id: DdpOutputBackend(), ArtnetOutputBackend.backend_id: ArtnetOutputBackend(), } self.active_backend_id = PreviewOutputBackend.backend_id self.output_enabled = False self._target_fps = self._clamp_target_fps(target_fps) self._config_snapshot: InfinityMirrorConfig | None = None self._config_source_id: int | None = None self._latest_frame: PreviewFrame | None = None self._latest_frame_version = 0 self._sent_frame_version = 0 self._frames_submitted = 0 self._frames_sent = 0 self._stale_frame_drops = 0 self._send_failures = 0 self._packets_last_frame = 0 self._devices_last_frame = 0 self._packets_sent_total = 0 self._last_send_duration_s = 0.0 self._send_budget_misses = 0 self._last_schedule_slip_s = 0.0 self._last_result_message = "" self._send_window_started_at = 0.0 self._send_window_count = 0 self._send_fps = 0.0 self._controller_metrics = ControllerMetrics( source="Controller-side FPS polling disabled during live output for stability.", ) self._pending_messages: deque[str] = deque() self._last_queued_message = "" self._lock = threading.Lock() self._condition = threading.Condition(self._lock) self._worker_thread: threading.Thread | None = None self._worker_stop_requested = False self._telemetry_thread: threading.Thread | None = None self._telemetry_stop_requested = False def __del__(self) -> None: # pragma: no cover - destructor timing is interpreter-dependent try: self.shutdown() except Exception: pass def backend_names(self) -> list[tuple[str, str]]: return [(backend_id, backend.display_name) for backend_id, backend in self.backends.items()] def set_active_backend(self, backend_id: str) -> None: with self._condition: if backend_id not in self.backends: raise KeyError(f"Unknown backend: {backend_id}") if self.active_backend_id == backend_id: return self.active_backend_id = backend_id self._condition.notify_all() def set_output_enabled(self, enabled: bool) -> None: with self._condition: enabled = bool(enabled) if self.output_enabled == enabled: return self.output_enabled = enabled if enabled: self._ensure_worker_locked() self._controller_metrics = ControllerMetrics( source="Controller-side FPS polling disabled during live output for stability.", ) self._condition.notify_all() def set_target_fps(self, value: float) -> None: with self._condition: target_fps = self._clamp_target_fps(value) if abs(target_fps - self._target_fps) < 1e-9: return self._target_fps = target_fps self._condition.notify_all() def target_fps(self) -> float: with self._lock: return self._target_fps def active_backend(self) -> OutputBackend: with self._lock: return self.backends[self.active_backend_id] def update_config(self, config: InfinityMirrorConfig) -> None: config_snapshot = config.clone() with self._condition: self._config_snapshot = config_snapshot self._config_source_id = id(config) self._condition.notify_all() def submit_frame(self, frame: PreviewFrame) -> OutputResult: with self._condition: if self.output_enabled and self._latest_frame_version > self._sent_frame_version: self._stale_frame_drops += 1 self._latest_frame = frame self._latest_frame_version += 1 self._frames_submitted += 1 if self.output_enabled: self._ensure_worker_locked() self._condition.notify_all() message = "Frame submitted to output worker." if self.output_enabled else "Hardware output disabled." return OutputResult(ok=True, message=message) def push_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult: with self._lock: config_source_id = self._config_source_id if config_source_id != id(config): self.update_config(config) return self.submit_frame(frame) def drain_status_messages(self) -> list[str]: with self._lock: messages = list(self._pending_messages) self._pending_messages.clear() return messages def diagnostics_snapshot(self) -> OutputDiagnostics: with self._lock: backend = self.backends[self.active_backend_id] worker_running = self.output_enabled and self._worker_thread is not None and self._worker_thread.is_alive() return OutputDiagnostics( backend_id=self.active_backend_id, backend_name=backend.display_name, output_enabled=self.output_enabled, worker_running=worker_running, target_fps=self._target_fps, send_fps=self._send_fps, last_send_time_ms=self._last_send_duration_s * 1000.0, frames_submitted=self._frames_submitted, frames_sent=self._frames_sent, stale_frame_drops=self._stale_frame_drops, send_failures=self._send_failures, packets_last_frame=self._packets_last_frame, devices_last_frame=self._devices_last_frame, packets_sent_total=self._packets_sent_total, last_message=self._last_result_message, send_budget_misses=self._send_budget_misses, last_schedule_slip_ms=self._last_schedule_slip_s * 1000.0, controller_fps=self._controller_metrics.fps, controller_live_devices=self._controller_metrics.live_devices, controller_sampled_devices=self._controller_metrics.sampled_devices, controller_total_devices=self._controller_metrics.total_devices, controller_source=self._controller_metrics.source, ) def shutdown(self) -> None: thread: threading.Thread | None = None telemetry_thread: threading.Thread | None = None with self._condition: self.output_enabled = False self._worker_stop_requested = True self._telemetry_stop_requested = True thread = self._worker_thread telemetry_thread = self._telemetry_thread self._condition.notify_all() if thread is not None: thread.join(timeout=1.0) if telemetry_thread is not None: telemetry_thread.join(timeout=1.0) with self._condition: if self._worker_thread is thread and thread is not None and not thread.is_alive(): self._worker_thread = None if self._telemetry_thread is telemetry_thread and telemetry_thread is not None and not telemetry_thread.is_alive(): self._telemetry_thread = None def _ensure_worker_locked(self) -> None: if self._worker_thread is not None and self._worker_thread.is_alive(): return self._worker_stop_requested = False self._worker_thread = threading.Thread( target=self._worker_loop, name="InfinityMirrorOutputWorker", daemon=True, ) self._worker_thread.start() def _ensure_telemetry_locked(self) -> None: if self._telemetry_thread is not None and self._telemetry_thread.is_alive(): return self._telemetry_stop_requested = False self._telemetry_thread = threading.Thread( target=self._telemetry_loop, name="InfinityMirrorTelemetryWorker", daemon=True, ) self._telemetry_thread.start() def _worker_loop(self) -> None: current_backend_id: str | None = None current_backend: OutputBackend | None = None next_send_at = time.perf_counter() try: while True: action = "wait" desired_backend_id = "" desired_backend: OutputBackend | None = None config: InfinityMirrorConfig | None = None frame: PreviewFrame | None = None frame_version = 0 interval_s = 1.0 / DEFAULT_OUTPUT_FPS scheduled_send_at = next_send_at with self._condition: while True: if self._worker_stop_requested: return desired_backend_id = self.active_backend_id desired_backend = self.backends[desired_backend_id] interval_s = 1.0 / self._target_fps if not self.output_enabled: if current_backend is not None: action = "disable_backend" break self._condition.wait() next_send_at = time.perf_counter() continue if current_backend_id != desired_backend_id or current_backend is None: action = "switch_backend" break config = self._config_snapshot frame = self._latest_frame frame_version = self._latest_frame_version if config is None or frame is None: self._condition.wait() next_send_at = time.perf_counter() continue now = time.perf_counter() wait_timeout = next_send_at - now if wait_timeout > 0.0: self._condition.wait(timeout=wait_timeout) continue scheduled_send_at = next_send_at action = "send_frame" break if action == "disable_backend": current_backend.stop() current_backend = None current_backend_id = None next_send_at = time.perf_counter() continue if action == "switch_backend": if current_backend is not None: current_backend.stop() current_backend = desired_backend current_backend_id = desired_backend_id try: current_backend.start() except OSError as exc: self._queue_status_message(f"{current_backend.display_name} start failed: {exc}") current_backend = None current_backend_id = None next_send_at = time.perf_counter() continue if action != "send_frame" or current_backend is None or config is None or frame is None: continue send_started_at = time.perf_counter() result = current_backend.send_frame(config, frame) send_finished_at = time.perf_counter() send_duration_s = send_finished_at - send_started_at schedule_slip_s = max(0.0, send_started_at - scheduled_send_at) missed_budget = send_finished_at > (scheduled_send_at + interval_s) with self._condition: self._last_send_duration_s = send_duration_s self._last_schedule_slip_s = schedule_slip_s if missed_budget: self._send_budget_misses += 1 self._frames_sent += 1 self._sent_frame_version = max(self._sent_frame_version, frame_version) self._packets_last_frame = result.packets_sent self._devices_last_frame = result.device_count self._packets_sent_total += result.packets_sent self._last_result_message = result.message if not result.ok and result.message: self._send_failures += 1 self._queue_status_message(result.message) self._record_send_fps(send_finished_at) # Keep a stable cadence anchored to the worker clock instead of adding # a full extra interval after every send. If a send overruns, jump to # "now" and continue with the freshest frame rather than compounding lag. next_send_at = max(next_send_at + interval_s, send_finished_at) finally: if current_backend is not None: try: current_backend.stop() finally: with self._condition: if self._worker_thread is threading.current_thread(): self._worker_thread = None else: with self._condition: if self._worker_thread is threading.current_thread(): self._worker_thread = None def _telemetry_loop(self) -> None: next_poll_at = time.perf_counter() try: while True: backend: OutputBackend | None = None config: InfinityMirrorConfig | None = None with self._condition: while True: if self._telemetry_stop_requested: return if not self.output_enabled: self._controller_metrics = ControllerMetrics() self._condition.wait() next_poll_at = time.perf_counter() continue config = self._config_snapshot backend = self.backends[self.active_backend_id] if config is None: self._condition.wait(timeout=0.2) next_poll_at = time.perf_counter() continue now = time.perf_counter() wait_timeout = next_poll_at - now if wait_timeout > 0.0: self._condition.wait(timeout=wait_timeout) continue break metrics = backend.controller_metrics(config) if backend is not None and config is not None else None with self._condition: self._controller_metrics = metrics if metrics is not None else ControllerMetrics() next_poll_at = time.perf_counter() + CONTROLLER_TELEMETRY_INTERVAL_S finally: with self._condition: if self._telemetry_thread is threading.current_thread(): self._telemetry_thread = None def _queue_status_message(self, message: str) -> None: if not message or message == self._last_queued_message: return self._pending_messages.append(message) self._last_queued_message = message def _record_send_fps(self, timestamp: float) -> None: if self._send_window_started_at <= 0.0: self._send_window_started_at = timestamp self._send_window_count = 1 self._send_fps = 0.0 return self._send_window_count += 1 elapsed = timestamp - self._send_window_started_at if elapsed >= 0.5: self._send_fps = self._send_window_count / elapsed self._send_window_started_at = timestamp self._send_window_count = 0 @staticmethod def _clamp_target_fps(value: float) -> float: return max(MIN_OUTPUT_FPS, min(MAX_OUTPUT_FPS, float(value)))