392 lines
17 KiB
Python
392 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import deque
|
|
import threading
|
|
import time
|
|
|
|
from app.config.models import InfinityMirrorConfig
|
|
from app.core.types import PreviewFrame
|
|
|
|
from .artnet import ArtnetOutputBackend
|
|
from .base import ControllerMetrics, OutputBackend, OutputDiagnostics, OutputResult
|
|
from .ddp import DdpOutputBackend
|
|
from .preview import PreviewOutputBackend
|
|
|
|
DEFAULT_OUTPUT_FPS = 40.0
|
|
MIN_OUTPUT_FPS = 1.0
|
|
MAX_OUTPUT_FPS = 60.0
|
|
class OutputManager:
|
|
def __init__(self, target_fps: float = DEFAULT_OUTPUT_FPS) -> None:
|
|
self.backends: dict[str, OutputBackend] = {
|
|
PreviewOutputBackend.backend_id: PreviewOutputBackend(),
|
|
DdpOutputBackend.backend_id: DdpOutputBackend(),
|
|
ArtnetOutputBackend.backend_id: ArtnetOutputBackend(),
|
|
}
|
|
self.active_backend_id = PreviewOutputBackend.backend_id
|
|
self.output_enabled = False
|
|
|
|
self._target_fps = self._clamp_target_fps(target_fps)
|
|
self._config_snapshot: InfinityMirrorConfig | None = None
|
|
self._config_source_id: int | None = None
|
|
self._latest_frame: PreviewFrame | None = None
|
|
self._latest_frame_version = 0
|
|
self._sent_frame_version = 0
|
|
|
|
self._frames_submitted = 0
|
|
self._frames_sent = 0
|
|
self._stale_frame_drops = 0
|
|
self._send_failures = 0
|
|
self._packets_last_frame = 0
|
|
self._devices_last_frame = 0
|
|
self._packets_sent_total = 0
|
|
self._last_send_duration_s = 0.0
|
|
self._send_budget_misses = 0
|
|
self._last_schedule_slip_s = 0.0
|
|
self._last_result_message = ""
|
|
self._send_window_started_at = 0.0
|
|
self._send_window_count = 0
|
|
self._send_fps = 0.0
|
|
self._controller_metrics = ControllerMetrics(
|
|
source="Controller-side FPS polling disabled during live output for stability.",
|
|
)
|
|
self._pending_messages: deque[str] = deque()
|
|
self._last_queued_message = ""
|
|
|
|
self._lock = threading.Lock()
|
|
self._condition = threading.Condition(self._lock)
|
|
self._worker_thread: threading.Thread | None = None
|
|
self._worker_stop_requested = False
|
|
self._telemetry_thread: threading.Thread | None = None
|
|
self._telemetry_stop_requested = False
|
|
|
|
def __del__(self) -> None: # pragma: no cover - destructor timing is interpreter-dependent
|
|
try:
|
|
self.shutdown()
|
|
except Exception:
|
|
pass
|
|
|
|
def backend_names(self) -> list[tuple[str, str]]:
|
|
return [(backend_id, backend.display_name) for backend_id, backend in self.backends.items()]
|
|
|
|
def set_active_backend(self, backend_id: str) -> None:
|
|
with self._condition:
|
|
if backend_id not in self.backends:
|
|
raise KeyError(f"Unknown backend: {backend_id}")
|
|
if self.active_backend_id == backend_id:
|
|
return
|
|
self.active_backend_id = backend_id
|
|
self._condition.notify_all()
|
|
|
|
def set_output_enabled(self, enabled: bool) -> None:
|
|
with self._condition:
|
|
enabled = bool(enabled)
|
|
if self.output_enabled == enabled:
|
|
return
|
|
self.output_enabled = enabled
|
|
if enabled:
|
|
self._ensure_worker_locked()
|
|
self._controller_metrics = ControllerMetrics(
|
|
source="Controller-side FPS polling disabled during live output for stability.",
|
|
)
|
|
self._condition.notify_all()
|
|
|
|
def set_target_fps(self, value: float) -> None:
|
|
with self._condition:
|
|
target_fps = self._clamp_target_fps(value)
|
|
if abs(target_fps - self._target_fps) < 1e-9:
|
|
return
|
|
self._target_fps = target_fps
|
|
self._condition.notify_all()
|
|
|
|
def target_fps(self) -> float:
|
|
with self._lock:
|
|
return self._target_fps
|
|
|
|
def active_backend(self) -> OutputBackend:
|
|
with self._lock:
|
|
return self.backends[self.active_backend_id]
|
|
|
|
def update_config(self, config: InfinityMirrorConfig) -> None:
|
|
config_snapshot = config.clone()
|
|
with self._condition:
|
|
self._config_snapshot = config_snapshot
|
|
self._config_source_id = id(config)
|
|
self._condition.notify_all()
|
|
|
|
def submit_frame(self, frame: PreviewFrame) -> OutputResult:
|
|
with self._condition:
|
|
if self.output_enabled and self._latest_frame_version > self._sent_frame_version:
|
|
self._stale_frame_drops += 1
|
|
self._latest_frame = frame
|
|
self._latest_frame_version += 1
|
|
self._frames_submitted += 1
|
|
if self.output_enabled:
|
|
self._ensure_worker_locked()
|
|
self._condition.notify_all()
|
|
message = "Frame submitted to output worker." if self.output_enabled else "Hardware output disabled."
|
|
return OutputResult(ok=True, message=message)
|
|
|
|
def push_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult:
|
|
with self._lock:
|
|
config_source_id = self._config_source_id
|
|
if config_source_id != id(config):
|
|
self.update_config(config)
|
|
return self.submit_frame(frame)
|
|
|
|
def drain_status_messages(self) -> list[str]:
|
|
with self._lock:
|
|
messages = list(self._pending_messages)
|
|
self._pending_messages.clear()
|
|
return messages
|
|
|
|
def diagnostics_snapshot(self) -> OutputDiagnostics:
|
|
with self._lock:
|
|
backend = self.backends[self.active_backend_id]
|
|
worker_running = self.output_enabled and self._worker_thread is not None and self._worker_thread.is_alive()
|
|
return OutputDiagnostics(
|
|
backend_id=self.active_backend_id,
|
|
backend_name=backend.display_name,
|
|
output_enabled=self.output_enabled,
|
|
worker_running=worker_running,
|
|
target_fps=self._target_fps,
|
|
send_fps=self._send_fps,
|
|
last_send_time_ms=self._last_send_duration_s * 1000.0,
|
|
frames_submitted=self._frames_submitted,
|
|
frames_sent=self._frames_sent,
|
|
stale_frame_drops=self._stale_frame_drops,
|
|
send_failures=self._send_failures,
|
|
packets_last_frame=self._packets_last_frame,
|
|
devices_last_frame=self._devices_last_frame,
|
|
packets_sent_total=self._packets_sent_total,
|
|
last_message=self._last_result_message,
|
|
send_budget_misses=self._send_budget_misses,
|
|
last_schedule_slip_ms=self._last_schedule_slip_s * 1000.0,
|
|
controller_fps=self._controller_metrics.fps,
|
|
controller_live_devices=self._controller_metrics.live_devices,
|
|
controller_sampled_devices=self._controller_metrics.sampled_devices,
|
|
controller_total_devices=self._controller_metrics.total_devices,
|
|
controller_source=self._controller_metrics.source,
|
|
)
|
|
|
|
def shutdown(self) -> None:
|
|
thread: threading.Thread | None = None
|
|
telemetry_thread: threading.Thread | None = None
|
|
with self._condition:
|
|
self.output_enabled = False
|
|
self._worker_stop_requested = True
|
|
self._telemetry_stop_requested = True
|
|
thread = self._worker_thread
|
|
telemetry_thread = self._telemetry_thread
|
|
self._condition.notify_all()
|
|
if thread is not None:
|
|
thread.join(timeout=1.0)
|
|
if telemetry_thread is not None:
|
|
telemetry_thread.join(timeout=1.0)
|
|
with self._condition:
|
|
if self._worker_thread is thread and thread is not None and not thread.is_alive():
|
|
self._worker_thread = None
|
|
if self._telemetry_thread is telemetry_thread and telemetry_thread is not None and not telemetry_thread.is_alive():
|
|
self._telemetry_thread = None
|
|
|
|
def _ensure_worker_locked(self) -> None:
|
|
if self._worker_thread is not None and self._worker_thread.is_alive():
|
|
return
|
|
self._worker_stop_requested = False
|
|
self._worker_thread = threading.Thread(
|
|
target=self._worker_loop,
|
|
name="InfinityMirrorOutputWorker",
|
|
daemon=True,
|
|
)
|
|
self._worker_thread.start()
|
|
|
|
def _ensure_telemetry_locked(self) -> None:
|
|
if self._telemetry_thread is not None and self._telemetry_thread.is_alive():
|
|
return
|
|
self._telemetry_stop_requested = False
|
|
self._telemetry_thread = threading.Thread(
|
|
target=self._telemetry_loop,
|
|
name="InfinityMirrorTelemetryWorker",
|
|
daemon=True,
|
|
)
|
|
self._telemetry_thread.start()
|
|
|
|
def _worker_loop(self) -> None:
|
|
current_backend_id: str | None = None
|
|
current_backend: OutputBackend | None = None
|
|
next_send_at = time.perf_counter()
|
|
|
|
try:
|
|
while True:
|
|
action = "wait"
|
|
desired_backend_id = ""
|
|
desired_backend: OutputBackend | None = None
|
|
config: InfinityMirrorConfig | None = None
|
|
frame: PreviewFrame | None = None
|
|
frame_version = 0
|
|
interval_s = 1.0 / DEFAULT_OUTPUT_FPS
|
|
scheduled_send_at = next_send_at
|
|
|
|
with self._condition:
|
|
while True:
|
|
if self._worker_stop_requested:
|
|
return
|
|
|
|
desired_backend_id = self.active_backend_id
|
|
desired_backend = self.backends[desired_backend_id]
|
|
interval_s = 1.0 / self._target_fps
|
|
|
|
if not self.output_enabled:
|
|
if current_backend is not None:
|
|
action = "disable_backend"
|
|
break
|
|
self._condition.wait()
|
|
next_send_at = time.perf_counter()
|
|
continue
|
|
|
|
if current_backend_id != desired_backend_id or current_backend is None:
|
|
action = "switch_backend"
|
|
break
|
|
|
|
config = self._config_snapshot
|
|
frame = self._latest_frame
|
|
frame_version = self._latest_frame_version
|
|
if config is None or frame is None:
|
|
self._condition.wait()
|
|
next_send_at = time.perf_counter()
|
|
continue
|
|
|
|
now = time.perf_counter()
|
|
wait_timeout = next_send_at - now
|
|
if wait_timeout > 0.0:
|
|
self._condition.wait(timeout=wait_timeout)
|
|
continue
|
|
|
|
scheduled_send_at = next_send_at
|
|
action = "send_frame"
|
|
break
|
|
|
|
if action == "disable_backend":
|
|
current_backend.stop()
|
|
current_backend = None
|
|
current_backend_id = None
|
|
next_send_at = time.perf_counter()
|
|
continue
|
|
|
|
if action == "switch_backend":
|
|
if current_backend is not None:
|
|
current_backend.stop()
|
|
current_backend = desired_backend
|
|
current_backend_id = desired_backend_id
|
|
try:
|
|
current_backend.start()
|
|
except OSError as exc:
|
|
self._queue_status_message(f"{current_backend.display_name} start failed: {exc}")
|
|
current_backend = None
|
|
current_backend_id = None
|
|
next_send_at = time.perf_counter()
|
|
continue
|
|
|
|
if action != "send_frame" or current_backend is None or config is None or frame is None:
|
|
continue
|
|
|
|
send_started_at = time.perf_counter()
|
|
result = current_backend.send_frame(config, frame)
|
|
send_finished_at = time.perf_counter()
|
|
send_duration_s = send_finished_at - send_started_at
|
|
schedule_slip_s = max(0.0, send_started_at - scheduled_send_at)
|
|
missed_budget = send_finished_at > (scheduled_send_at + interval_s)
|
|
|
|
with self._condition:
|
|
self._last_send_duration_s = send_duration_s
|
|
self._last_schedule_slip_s = schedule_slip_s
|
|
if missed_budget:
|
|
self._send_budget_misses += 1
|
|
self._frames_sent += 1
|
|
self._sent_frame_version = max(self._sent_frame_version, frame_version)
|
|
self._packets_last_frame = result.packets_sent
|
|
self._devices_last_frame = result.device_count
|
|
self._packets_sent_total += result.packets_sent
|
|
self._last_result_message = result.message
|
|
if not result.ok and result.message:
|
|
self._send_failures += 1
|
|
self._queue_status_message(result.message)
|
|
self._record_send_fps(send_finished_at)
|
|
|
|
# Keep a stable cadence anchored to the worker clock instead of adding
|
|
# a full extra interval after every send. If a send overruns, jump to
|
|
# "now" and continue with the freshest frame rather than compounding lag.
|
|
next_send_at = max(next_send_at + interval_s, send_finished_at)
|
|
finally:
|
|
if current_backend is not None:
|
|
try:
|
|
current_backend.stop()
|
|
finally:
|
|
with self._condition:
|
|
if self._worker_thread is threading.current_thread():
|
|
self._worker_thread = None
|
|
else:
|
|
with self._condition:
|
|
if self._worker_thread is threading.current_thread():
|
|
self._worker_thread = None
|
|
|
|
def _telemetry_loop(self) -> None:
|
|
next_poll_at = time.perf_counter()
|
|
try:
|
|
while True:
|
|
backend: OutputBackend | None = None
|
|
config: InfinityMirrorConfig | None = None
|
|
with self._condition:
|
|
while True:
|
|
if self._telemetry_stop_requested:
|
|
return
|
|
if not self.output_enabled:
|
|
self._controller_metrics = ControllerMetrics()
|
|
self._condition.wait()
|
|
next_poll_at = time.perf_counter()
|
|
continue
|
|
config = self._config_snapshot
|
|
backend = self.backends[self.active_backend_id]
|
|
if config is None:
|
|
self._condition.wait(timeout=0.2)
|
|
next_poll_at = time.perf_counter()
|
|
continue
|
|
now = time.perf_counter()
|
|
wait_timeout = next_poll_at - now
|
|
if wait_timeout > 0.0:
|
|
self._condition.wait(timeout=wait_timeout)
|
|
continue
|
|
break
|
|
|
|
metrics = backend.controller_metrics(config) if backend is not None and config is not None else None
|
|
with self._condition:
|
|
self._controller_metrics = metrics if metrics is not None else ControllerMetrics()
|
|
next_poll_at = time.perf_counter() + CONTROLLER_TELEMETRY_INTERVAL_S
|
|
finally:
|
|
with self._condition:
|
|
if self._telemetry_thread is threading.current_thread():
|
|
self._telemetry_thread = None
|
|
|
|
def _queue_status_message(self, message: str) -> None:
|
|
if not message or message == self._last_queued_message:
|
|
return
|
|
self._pending_messages.append(message)
|
|
self._last_queued_message = message
|
|
|
|
def _record_send_fps(self, timestamp: float) -> None:
|
|
if self._send_window_started_at <= 0.0:
|
|
self._send_window_started_at = timestamp
|
|
self._send_window_count = 1
|
|
self._send_fps = 0.0
|
|
return
|
|
|
|
self._send_window_count += 1
|
|
elapsed = timestamp - self._send_window_started_at
|
|
if elapsed >= 0.5:
|
|
self._send_fps = self._send_window_count / elapsed
|
|
self._send_window_started_at = timestamp
|
|
self._send_window_count = 0
|
|
|
|
@staticmethod
|
|
def _clamp_target_fps(value: float) -> float:
|
|
return max(MIN_OUTPUT_FPS, min(MAX_OUTPUT_FPS, float(value)))
|