from __future__ import annotations from pathlib import Path import time import traceback from app.qt_compat import QObject, QTimer, Qt, Signal from app.config.models import InfinityMirrorConfig from app.config.xml_mapping import MappingValidationError, load_config, save_config from app.core.diagnostics import RealtimeDiagnostics from app.core.pattern_engine import PatternEngine from app.core.pattern_compat import normalize_pattern_request from app.core.presets import PresetRecord, PresetStore from app.core.types import PatternParameters, PreviewFrame, SceneState, SceneTransition, blend_preview_frames from app.output.manager import OutputManager NUMERIC_PARAMETER_KEYS = { "brightness", "fade", "tempo_multiplier", "angle", "on_width", "off_width", "block_size", "pixel_group_size", "strobe_duty_cycle", "randomness", } DEFAULT_TEMPO_BPM = 120.0 MIN_TEMPO_BPM = 10.0 MAX_TEMPO_BPM = 300.0 DEFAULT_RENDER_FPS = 60.0 DEFAULT_PREVIEW_FPS = 30.0 PRIORITIZED_PREVIEW_FPS = 12.0 def _clamp_tempo_bpm(value: float) -> float: return max(MIN_TEMPO_BPM, min(MAX_TEMPO_BPM, float(value))) def _legacy_speed_to_tempo_bpm(value: float) -> float: return _clamp_tempo_bpm(float(value) * 60.0) class InfinityMirrorController(QObject): frame_ready = Signal(object) next_frame_ready = Signal(object) state_changed = Signal() config_changed = Signal() presets_changed = Signal() status_message = Signal(str) def __init__(self, project_root: str | Path, output_manager: OutputManager | None = None) -> None: super().__init__() self.project_root = Path(project_root) self.output_manager = output_manager if output_manager is not None else OutputManager() self.preset_store = PresetStore(self.project_root / "presets") self.preset_store.ensure_seed_presets() self.config = InfinityMirrorConfig() self.mapping_path: Path | None = None self._live_scene = SceneState() self._next_scene = self._live_scene.clone() self.foh_mode_enabled = False self.tempo_bpm = DEFAULT_TEMPO_BPM self._tempo_anchor_time_s = 0.0 self._tempo_anchor_phase = 0.0 self._transport_time_s = 0.0 self.transition_duration_s = 2.0 self._scene_transition: SceneTransition | None = None self.live_engine = PatternEngine() self.next_engine = PatternEngine() self._transition_engine: PatternEngine | None = None self.utility_mode = "none" self.selected_tile_id: str | None = None self.current_frame: PreviewFrame | None = None self.next_frame: PreviewFrame | None = None self._last_output_message = "" self._frames_rendered = 0 self._last_render_duration_s = 0.0 self._render_window_started_at = 0.0 self._render_window_count = 0 self._render_fps = 0.0 self._last_live_preview_emit_time_s = None self._last_next_preview_emit_time_s = None self._last_live_preview_emit_time_s: float | None = None self._last_next_preview_emit_time_s: float | None = None self.output_manager.update_config(self.config) self.timer = QTimer(self) self.timer.setTimerType(Qt.PreciseTimer) self.timer.setInterval(max(1, int(round(1000.0 / DEFAULT_RENDER_FPS)))) self.timer.timeout.connect(self.render_once) self.timer.start() @property def pattern_id(self) -> str: return self._editable_scene().pattern_id @pattern_id.setter def pattern_id(self, value: str) -> None: self.set_pattern(value) @property def params(self) -> PatternParameters: return self._editable_scene().params.clone() @params.setter def params(self, value: PatternParameters) -> None: self.set_params(value) @property def transition_active(self) -> bool: return self._scene_transition is not None def live_scene(self) -> SceneState: return self.scene_state("live") def next_scene(self) -> SceneState: return self.scene_state("next") def scene_state(self, role: str = "live") -> SceneState: return self._scene_for_role(role).clone() def preview_frame_for(self, role: str = "live") -> PreviewFrame | None: return self.next_frame if str(role).strip().lower() == "next" else self.current_frame def startup_mapping_candidates(self) -> list[Path]: return [ self.project_root / "sample_data" / "infinity_mirror_mapping_clean.xml", self.project_root / "infinity_mirror_mapping_clean.xml", ] def load_initial_config(self) -> None: for candidate in self.startup_mapping_candidates(): if candidate.exists(): self.load_mapping(candidate) return self.status_message.emit("No default mapping found. Use File > Open Mapping.") def load_mapping(self, path: str | Path) -> None: config = load_config(path) self.config = config self.mapping_path = Path(path) if self.selected_tile_id not in self.config.tile_lookup(): first_tile = self.config.sorted_tiles()[0] if self.config.tiles else None self.selected_tile_id = first_tile.tile_id if first_tile else None self._reset_render_state() self.output_manager.update_config(self.config) self.config_changed.emit() self.state_changed.emit() self.status_message.emit(f"Loaded mapping: {Path(path).name}") self.render_once(force_preview=True) def save_mapping(self, path: str | Path | None = None) -> None: target = Path(path) if path is not None else self.mapping_path if target is None: raise ValueError("No mapping path set.") save_config(self.config, target) self.mapping_path = target self.status_message.emit(f"Saved mapping: {target.name}") def replace_config(self, config: InfinityMirrorConfig, path: str | Path | None = None) -> None: self.config = config if path is not None: self.mapping_path = Path(path) if self.selected_tile_id not in self.config.tile_lookup(): first_tile = self.config.sorted_tiles()[0] if self.config.tiles else None self.selected_tile_id = first_tile.tile_id if first_tile else None self._reset_render_state() self.output_manager.update_config(self.config) self.config_changed.emit() self.state_changed.emit() self.render_once(force_preview=True) def _reset_render_state(self) -> None: self.live_engine = PatternEngine() self.next_engine = PatternEngine() self._transition_engine = None self._scene_transition = None self.current_frame = None self.next_frame = None self._last_output_message = "" self._frames_rendered = 0 self._last_render_duration_s = 0.0 self._render_window_started_at = 0.0 self._render_window_count = 0 self._render_fps = 0.0 def _editable_scene(self) -> SceneState: return self._scene_for_role(self._editable_scene_role()) def _editable_scene_role(self) -> str: return "next" if self.foh_mode_enabled else "live" def _normalize_scene_role(self, role: str = "live") -> str: return "next" if str(role).strip().lower() == "next" else "live" def _scene_for_role(self, role: str = "live") -> SceneState: return self._next_scene if self._normalize_scene_role(role) == "next" else self._live_scene def _replace_scene(self, role: str, scene: SceneState) -> None: cloned_scene = scene.clone() if self._normalize_scene_role(role) == "next": self._next_scene = cloned_scene else: self._live_scene = cloned_scene def _editable_scene_snapshot(self) -> SceneState: return self._editable_scene().clone() def _replace_editable_scene(self, scene: SceneState) -> None: self._replace_scene(self._editable_scene_role(), scene) def _sync_next_to_live(self) -> None: self._next_scene = self._live_scene.clone() self.next_engine = PatternEngine() self.next_frame = None def _normalize_pattern_request(self, pattern_id: str, params: PatternParameters | None = None) -> tuple[str, PatternParameters | None]: return normalize_pattern_request( pattern_id, params, rows=self.config.logical_display.rows, cols=self.config.logical_display.cols, ) def _render_scene_frame( self, engine: PatternEngine, scene: SceneState, timestamp: float, utility_mode: str = "none", ) -> PreviewFrame: tempo_phase = self._tempo_phase_at(timestamp) return engine.render_frame( config=self.config, pattern_id=scene.pattern_id, params=scene.params, utility_mode=utility_mode, selected_tile_id=self.selected_tile_id, timestamp=timestamp, tempo_bpm=self.tempo_bpm, tempo_phase=tempo_phase, ) def _tempo_phase_at(self, timestamp: float) -> float: return self._tempo_anchor_phase + (float(timestamp) - self._tempo_anchor_time_s) * (self.tempo_bpm / 60.0) def _retime_transport(self, tempo_bpm: float, timestamp: float) -> None: current_phase = self._tempo_phase_at(timestamp) self.tempo_bpm = tempo_bpm self._tempo_anchor_time_s = float(timestamp) self._tempo_anchor_phase = current_phase self._transport_time_s = float(timestamp) def set_tempo_bpm(self, value: float, timestamp: float | None = None) -> None: tempo_bpm = _clamp_tempo_bpm(float(value)) if abs(tempo_bpm - self.tempo_bpm) < 1e-9: return anchor_time = self._transport_time_s if timestamp is None else float(timestamp) self._retime_transport(tempo_bpm, anchor_time) self.state_changed.emit() self.render_once(timestamp=anchor_time if timestamp is not None else None, force_preview=True) def set_foh_mode(self, enabled: bool) -> None: enabled = bool(enabled) if self.foh_mode_enabled == enabled: return self.foh_mode_enabled = enabled if not enabled: if self._scene_transition is None: self._sync_next_to_live() self.state_changed.emit() self.render_once(force_preview=True) def set_transition_duration(self, duration_s: float) -> None: duration = max(0.0, min(60.0, float(duration_s))) if abs(duration - self.transition_duration_s) < 1e-9: return self.transition_duration_s = duration self.state_changed.emit() def set_pattern(self, pattern_id: str) -> None: scene = self._editable_scene_snapshot() pattern_id, params = self._normalize_pattern_request(pattern_id, scene.params) scene.pattern_id = pattern_id if params is not None: scene.params = params self._replace_editable_scene(scene) if not self.foh_mode_enabled: self.utility_mode = "none" self._sync_next_to_live() self.state_changed.emit() self.render_once(force_preview=True) def set_params(self, value: PatternParameters) -> None: scene = self._editable_scene_snapshot() params = value.clone().sanitized() if params == scene.params: return scene.params = params self._replace_editable_scene(scene) if not self.foh_mode_enabled: self._sync_next_to_live() self.state_changed.emit() self.render_once(force_preview=True) def set_parameter(self, key: str, value) -> None: if key == "speed": try: self.set_tempo_bpm(_legacy_speed_to_tempo_bpm(float(value))) except (TypeError, ValueError) as exc: self.status_message.emit(f"Invalid value for {key}: {value!r} ({exc})") return if key == "step_size": self.status_message.emit("Step Size has been retired. Use the global BPM control instead.") return scene = self._editable_scene_snapshot() if not hasattr(scene.params, key): self.status_message.emit(f"Unknown parameter ignored: {key}") return payload = { field_name: getattr(scene.params, field_name) for field_name in scene.params.__dataclass_fields__ } try: if key in NUMERIC_PARAMETER_KEYS: payload[key] = float(value) else: payload[key] = value params = PatternParameters.from_dict(payload) except (TypeError, ValueError) as exc: self.status_message.emit(f"Invalid value for {key}: {value!r} ({exc})") return if params == scene.params: return scene.params = params self._replace_editable_scene(scene) if not self.foh_mode_enabled: self._sync_next_to_live() self.state_changed.emit() if not self.timer.isActive() or self.current_frame is None: self.render_once(force_preview=True) def set_selected_tile(self, tile_id: str | None) -> None: self.selected_tile_id = tile_id self.state_changed.emit() def set_backend(self, backend_id: str) -> None: self.output_manager.set_active_backend(backend_id) self.state_changed.emit() self.status_message.emit(f"Active backend: {self.output_manager.active_backend().display_name}") def set_output_enabled(self, enabled: bool) -> None: self.output_manager.set_output_enabled(enabled) self.state_changed.emit() state = "enabled" if enabled else "disabled" self.status_message.emit(f"Hardware output {state}.") def set_output_target_fps(self, value: float) -> None: self.output_manager.set_target_fps(value) self.state_changed.emit() def set_utility_mode(self, utility_mode: str) -> None: self.utility_mode = utility_mode self.state_changed.emit() self.render_once(force_preview=True) def clear_utility_mode(self) -> None: self.utility_mode = "none" self.state_changed.emit() self.render_once(force_preview=True) def go_scene(self) -> None: self.utility_mode = "none" self._scene_transition = None self._transition_engine = None self._live_scene = self._next_scene.clone() self.live_engine = PatternEngine() if not self.foh_mode_enabled: self._sync_next_to_live() self.state_changed.emit() self.render_once(force_preview=True) self.status_message.emit("Go: next scene is now live.") def fade_go(self, duration_s: float | None = None, timestamp: float | None = None) -> None: duration = self.transition_duration_s if duration_s is None else max(0.0, min(60.0, float(duration_s))) self.transition_duration_s = duration if duration <= 0.0: self.go_scene() return now = time.perf_counter() if timestamp is None else timestamp if self.current_frame is None: self.render_once(timestamp=now, force_preview=True) source_frame = self.current_frame if source_frame is None: source_frame = self._render_scene_frame(self.live_engine, self._live_scene, now, utility_mode=self.utility_mode) self.utility_mode = "none" self._transition_engine = PatternEngine() self._scene_transition = SceneTransition( started_at=now, duration_s=duration, source_frame=source_frame, target_scene=self._next_scene.clone(), ) self.state_changed.emit() self.render_once(timestamp=now, force_preview=True) self.status_message.emit(f"Fade Go: {duration:.1f}s transition started.") def available_patterns(self): return self.live_engine.descriptors() def available_presets(self) -> list[PresetRecord]: return self.preset_store.list_presets() def save_current_preset(self, name: str) -> None: record = PresetRecord.create(name=name, pattern_id=self.pattern_id, params=self.params, tempo_bpm=self.tempo_bpm) self.preset_store.save(record) self.presets_changed.emit() self.status_message.emit(f"Saved preset: {name}") def apply_preset(self, preset_name: str) -> None: record = self.preset_store.load(preset_name) params = PatternParameters.from_dict(record.parameters) pattern_id, normalized_params = self._normalize_pattern_request(record.pattern_id, params) scene = self._editable_scene_snapshot() scene.pattern_id = pattern_id scene.params = normalized_params if normalized_params is not None else params self._replace_editable_scene(scene) preset_tempo_bpm: float | None = None if record.tempo_bpm is not None: preset_tempo_bpm = _clamp_tempo_bpm(record.tempo_bpm) elif "speed" in record.parameters: try: preset_tempo_bpm = _legacy_speed_to_tempo_bpm(float(record.parameters["speed"])) except (TypeError, ValueError): pass if preset_tempo_bpm is not None: self._retime_transport(preset_tempo_bpm, self._transport_time_s) if not self.foh_mode_enabled: self.utility_mode = "none" self._sync_next_to_live() self.state_changed.emit() self.render_once(force_preview=True) self.status_message.emit(f"Loaded preset: {preset_name}") def delete_preset(self, preset_name: str) -> None: self.preset_store.delete(preset_name) self.presets_changed.emit() self.status_message.emit(f"Deleted preset: {preset_name}") def render_once(self, timestamp: float | None = None, force_preview: bool = False) -> None: if not self.config.tiles: return render_started_at = time.perf_counter() now = time.perf_counter() if timestamp is None else timestamp self._transport_time_s = float(now) try: if self._scene_transition is None: live_frame = self._render_scene_frame( self.live_engine, self._live_scene, now, utility_mode=self.utility_mode, ) else: if self._transition_engine is None: self._transition_engine = PatternEngine() target_frame = self._render_scene_frame(self._transition_engine, self._scene_transition.target_scene, now) elapsed = max(0.0, now - self._scene_transition.started_at) alpha = 1.0 if self._scene_transition.duration_s <= 0.0 else min(1.0, elapsed / self._scene_transition.duration_s) live_frame = blend_preview_frames(self._scene_transition.source_frame, target_frame, alpha) if alpha >= 1.0: self._live_scene = self._scene_transition.target_scene.clone() self.live_engine = self._transition_engine self._transition_engine = None self._scene_transition = None live_frame = target_frame if not self.foh_mode_enabled: self._sync_next_to_live() self.state_changed.emit() self.current_frame = live_frame if self.foh_mode_enabled: self.next_frame = self._render_scene_frame(self.next_engine, self._next_scene, now) elif self._next_scene.pattern_id == self._live_scene.pattern_id and self._next_scene.params.to_dict() == self._live_scene.params.to_dict(): self.next_frame = live_frame self.output_manager.submit_frame(live_frame) self._emit_preview_frames(live_frame, self.next_frame if self.foh_mode_enabled else None, now, force_preview) self._emit_output_status_messages() except Exception as exc: # pragma: no cover - UI safety net message = f"Render error: {exc}" if message != self._last_output_message: self._last_output_message = message self.status_message.emit(message) traceback.print_exc() finally: self._record_render_metrics(render_started_at, time.perf_counter()) def realtime_diagnostics(self) -> RealtimeDiagnostics: output = self.output_manager.diagnostics_snapshot() return RealtimeDiagnostics( backend_id=output.backend_id, backend_name=output.backend_name, output_enabled=output.output_enabled, worker_running=output.worker_running, target_output_fps=output.target_fps, render_fps=self._render_fps, send_fps=output.send_fps, last_render_time_ms=self._last_render_duration_s * 1000.0, last_send_time_ms=output.last_send_time_ms, frames_rendered=self._frames_rendered, frames_submitted=output.frames_submitted, frames_sent=output.frames_sent, stale_frame_drops=output.stale_frame_drops, send_failures=output.send_failures, packets_last_frame=output.packets_last_frame, devices_last_frame=output.devices_last_frame, packets_sent_total=output.packets_sent_total, last_output_message=output.last_message, send_budget_misses=output.send_budget_misses, last_schedule_slip_ms=output.last_schedule_slip_ms, controller_fps=output.controller_fps, controller_live_devices=output.controller_live_devices, controller_sampled_devices=output.controller_sampled_devices, controller_total_devices=output.controller_total_devices, controller_source=output.controller_source, ) def shutdown(self) -> None: self.timer.stop() self.output_manager.shutdown() def safe_load_mapping(self, path: str | Path) -> tuple[bool, list[str]]: try: self.load_mapping(path) return True, [] except MappingValidationError as exc: return False, exc.errors def _emit_output_status_messages(self) -> None: for message in self.output_manager.drain_status_messages(): if message != self._last_output_message: self._last_output_message = message self.status_message.emit(message) def _record_render_metrics(self, started_at: float, finished_at: float) -> None: self._last_render_duration_s = max(0.0, finished_at - started_at) self._frames_rendered += 1 if self._render_window_started_at <= 0.0: self._render_window_started_at = finished_at self._render_window_count = 1 self._render_fps = 0.0 return self._render_window_count += 1 elapsed = finished_at - self._render_window_started_at if elapsed >= 0.5: self._render_fps = self._render_window_count / elapsed self._render_window_started_at = finished_at self._render_window_count = 0 def _emit_preview_frames( self, live_frame: PreviewFrame, next_frame: PreviewFrame | None, timestamp: float, force_preview: bool, ) -> None: if self._preview_emit_due(self._last_live_preview_emit_time_s, timestamp, force_preview): self._last_live_preview_emit_time_s = timestamp self.frame_ready.emit(live_frame) if next_frame is not None and self._preview_emit_due(self._last_next_preview_emit_time_s, timestamp, force_preview): self._last_next_preview_emit_time_s = timestamp self.next_frame_ready.emit(next_frame) def _preview_emit_due(self, last_emit_time_s: float | None, timestamp: float, force_preview: bool) -> bool: if force_preview or last_emit_time_s is None: return True if float(timestamp) <= last_emit_time_s: return True return (float(timestamp) - last_emit_time_s) >= self._preview_emit_interval_s() def _preview_emit_interval_s(self) -> float: try: backend = self.output_manager.active_backend() live_output_active = self.output_manager.output_enabled and backend.supports_live_output except Exception: live_output_active = False preview_fps = PRIORITIZED_PREVIEW_FPS if live_output_active else DEFAULT_PREVIEW_FPS return 1.0 / max(1.0, preview_fps)