Files
RFP_Infinity-Vis/app/core/controller.py

610 lines
24 KiB
Python

from __future__ import annotations
from pathlib import Path
import time
import traceback
from app.qt_compat import QObject, QTimer, Qt, Signal
from app.config.models import InfinityMirrorConfig
from app.config.xml_mapping import MappingValidationError, load_config, save_config
from app.core.diagnostics import RealtimeDiagnostics
from app.core.pattern_engine import PatternEngine
from app.core.pattern_compat import normalize_pattern_request
from app.core.presets import PresetRecord, PresetStore
from app.core.types import PatternParameters, PreviewFrame, SceneState, SceneTransition, blend_preview_frames
from app.output.manager import OutputManager
NUMERIC_PARAMETER_KEYS = {
"brightness",
"fade",
"tempo_multiplier",
"angle",
"on_width",
"off_width",
"block_size",
"pixel_group_size",
"strobe_duty_cycle",
"randomness",
}
DEFAULT_TEMPO_BPM = 120.0
MIN_TEMPO_BPM = 10.0
MAX_TEMPO_BPM = 300.0
DEFAULT_RENDER_FPS = 60.0
DEFAULT_PREVIEW_FPS = 30.0
PRIORITIZED_PREVIEW_FPS = 12.0
def _clamp_tempo_bpm(value: float) -> float:
return max(MIN_TEMPO_BPM, min(MAX_TEMPO_BPM, float(value)))
def _legacy_speed_to_tempo_bpm(value: float) -> float:
return _clamp_tempo_bpm(float(value) * 60.0)
class InfinityMirrorController(QObject):
frame_ready = Signal(object)
next_frame_ready = Signal(object)
state_changed = Signal()
config_changed = Signal()
presets_changed = Signal()
status_message = Signal(str)
def __init__(self, project_root: str | Path, output_manager: OutputManager | None = None) -> None:
super().__init__()
self.project_root = Path(project_root)
self.output_manager = output_manager if output_manager is not None else OutputManager()
self.preset_store = PresetStore(self.project_root / "presets")
self.preset_store.ensure_seed_presets()
self.config = InfinityMirrorConfig()
self.mapping_path: Path | None = None
self._live_scene = SceneState()
self._next_scene = self._live_scene.clone()
self.foh_mode_enabled = False
self.tempo_bpm = DEFAULT_TEMPO_BPM
self._tempo_anchor_time_s = 0.0
self._tempo_anchor_phase = 0.0
self._transport_time_s = 0.0
self.transition_duration_s = 2.0
self._scene_transition: SceneTransition | None = None
self.live_engine = PatternEngine()
self.next_engine = PatternEngine()
self._transition_engine: PatternEngine | None = None
self.utility_mode = "none"
self.selected_tile_id: str | None = None
self.current_frame: PreviewFrame | None = None
self.next_frame: PreviewFrame | None = None
self._last_output_message = ""
self._frames_rendered = 0
self._last_render_duration_s = 0.0
self._render_window_started_at = 0.0
self._render_window_count = 0
self._render_fps = 0.0
self._last_live_preview_emit_time_s = None
self._last_next_preview_emit_time_s = None
self._last_live_preview_emit_time_s: float | None = None
self._last_next_preview_emit_time_s: float | None = None
self.output_manager.update_config(self.config)
self.timer = QTimer(self)
self.timer.setTimerType(Qt.PreciseTimer)
self.timer.setInterval(max(1, int(round(1000.0 / DEFAULT_RENDER_FPS))))
self.timer.timeout.connect(self.render_once)
self.timer.start()
@property
def pattern_id(self) -> str:
return self._editable_scene().pattern_id
@pattern_id.setter
def pattern_id(self, value: str) -> None:
self.set_pattern(value)
@property
def params(self) -> PatternParameters:
return self._editable_scene().params.clone()
@params.setter
def params(self, value: PatternParameters) -> None:
self.set_params(value)
@property
def transition_active(self) -> bool:
return self._scene_transition is not None
def live_scene(self) -> SceneState:
return self.scene_state("live")
def next_scene(self) -> SceneState:
return self.scene_state("next")
def scene_state(self, role: str = "live") -> SceneState:
return self._scene_for_role(role).clone()
def preview_frame_for(self, role: str = "live") -> PreviewFrame | None:
return self.next_frame if str(role).strip().lower() == "next" else self.current_frame
def startup_mapping_candidates(self) -> list[Path]:
return [
self.project_root / "sample_data" / "infinity_mirror_mapping_clean.xml",
self.project_root / "infinity_mirror_mapping_clean.xml",
]
def load_initial_config(self) -> None:
for candidate in self.startup_mapping_candidates():
if candidate.exists():
self.load_mapping(candidate)
return
self.status_message.emit("No default mapping found. Use File > Open Mapping.")
def load_mapping(self, path: str | Path) -> None:
config = load_config(path)
self.config = config
self.mapping_path = Path(path)
if self.selected_tile_id not in self.config.tile_lookup():
first_tile = self.config.sorted_tiles()[0] if self.config.tiles else None
self.selected_tile_id = first_tile.tile_id if first_tile else None
self._reset_render_state()
self.output_manager.update_config(self.config)
self.config_changed.emit()
self.state_changed.emit()
self.status_message.emit(f"Loaded mapping: {Path(path).name}")
self.render_once(force_preview=True)
def save_mapping(self, path: str | Path | None = None) -> None:
target = Path(path) if path is not None else self.mapping_path
if target is None:
raise ValueError("No mapping path set.")
save_config(self.config, target)
self.mapping_path = target
self.status_message.emit(f"Saved mapping: {target.name}")
def replace_config(self, config: InfinityMirrorConfig, path: str | Path | None = None) -> None:
self.config = config
if path is not None:
self.mapping_path = Path(path)
if self.selected_tile_id not in self.config.tile_lookup():
first_tile = self.config.sorted_tiles()[0] if self.config.tiles else None
self.selected_tile_id = first_tile.tile_id if first_tile else None
self._reset_render_state()
self.output_manager.update_config(self.config)
self.config_changed.emit()
self.state_changed.emit()
self.render_once(force_preview=True)
def _reset_render_state(self) -> None:
self.live_engine = PatternEngine()
self.next_engine = PatternEngine()
self._transition_engine = None
self._scene_transition = None
self.current_frame = None
self.next_frame = None
self._last_output_message = ""
self._frames_rendered = 0
self._last_render_duration_s = 0.0
self._render_window_started_at = 0.0
self._render_window_count = 0
self._render_fps = 0.0
def _editable_scene(self) -> SceneState:
return self._scene_for_role(self._editable_scene_role())
def _editable_scene_role(self) -> str:
return "next" if self.foh_mode_enabled else "live"
def _normalize_scene_role(self, role: str = "live") -> str:
return "next" if str(role).strip().lower() == "next" else "live"
def _scene_for_role(self, role: str = "live") -> SceneState:
return self._next_scene if self._normalize_scene_role(role) == "next" else self._live_scene
def _replace_scene(self, role: str, scene: SceneState) -> None:
cloned_scene = scene.clone()
if self._normalize_scene_role(role) == "next":
self._next_scene = cloned_scene
else:
self._live_scene = cloned_scene
def _editable_scene_snapshot(self) -> SceneState:
return self._editable_scene().clone()
def _replace_editable_scene(self, scene: SceneState) -> None:
self._replace_scene(self._editable_scene_role(), scene)
def _sync_next_to_live(self) -> None:
self._next_scene = self._live_scene.clone()
self.next_engine = PatternEngine()
self.next_frame = None
def _normalize_pattern_request(self, pattern_id: str, params: PatternParameters | None = None) -> tuple[str, PatternParameters | None]:
return normalize_pattern_request(
pattern_id,
params,
rows=self.config.logical_display.rows,
cols=self.config.logical_display.cols,
)
def _render_scene_frame(
self,
engine: PatternEngine,
scene: SceneState,
timestamp: float,
utility_mode: str = "none",
) -> PreviewFrame:
tempo_phase = self._tempo_phase_at(timestamp)
return engine.render_frame(
config=self.config,
pattern_id=scene.pattern_id,
params=scene.params,
utility_mode=utility_mode,
selected_tile_id=self.selected_tile_id,
timestamp=timestamp,
tempo_bpm=self.tempo_bpm,
tempo_phase=tempo_phase,
)
def _tempo_phase_at(self, timestamp: float) -> float:
return self._tempo_anchor_phase + (float(timestamp) - self._tempo_anchor_time_s) * (self.tempo_bpm / 60.0)
def _retime_transport(self, tempo_bpm: float, timestamp: float) -> None:
current_phase = self._tempo_phase_at(timestamp)
self.tempo_bpm = tempo_bpm
self._tempo_anchor_time_s = float(timestamp)
self._tempo_anchor_phase = current_phase
self._transport_time_s = float(timestamp)
def set_tempo_bpm(self, value: float, timestamp: float | None = None) -> None:
tempo_bpm = _clamp_tempo_bpm(float(value))
if abs(tempo_bpm - self.tempo_bpm) < 1e-9:
return
anchor_time = self._transport_time_s if timestamp is None else float(timestamp)
self._retime_transport(tempo_bpm, anchor_time)
self.state_changed.emit()
self.render_once(timestamp=anchor_time if timestamp is not None else None, force_preview=True)
def set_foh_mode(self, enabled: bool) -> None:
enabled = bool(enabled)
if self.foh_mode_enabled == enabled:
return
self.foh_mode_enabled = enabled
if not enabled:
if self._scene_transition is None:
self._sync_next_to_live()
self.state_changed.emit()
self.render_once(force_preview=True)
def set_transition_duration(self, duration_s: float) -> None:
duration = max(0.0, min(60.0, float(duration_s)))
if abs(duration - self.transition_duration_s) < 1e-9:
return
self.transition_duration_s = duration
self.state_changed.emit()
def set_pattern(self, pattern_id: str) -> None:
scene = self._editable_scene_snapshot()
pattern_id, params = self._normalize_pattern_request(pattern_id, scene.params)
scene.pattern_id = pattern_id
if params is not None:
scene.params = params
self._replace_editable_scene(scene)
if not self.foh_mode_enabled:
self.utility_mode = "none"
self._sync_next_to_live()
self.state_changed.emit()
self.render_once(force_preview=True)
def set_params(self, value: PatternParameters) -> None:
scene = self._editable_scene_snapshot()
params = value.clone().sanitized()
if params == scene.params:
return
scene.params = params
self._replace_editable_scene(scene)
if not self.foh_mode_enabled:
self._sync_next_to_live()
self.state_changed.emit()
self.render_once(force_preview=True)
def set_parameter(self, key: str, value) -> None:
if key == "speed":
try:
self.set_tempo_bpm(_legacy_speed_to_tempo_bpm(float(value)))
except (TypeError, ValueError) as exc:
self.status_message.emit(f"Invalid value for {key}: {value!r} ({exc})")
return
if key == "step_size":
self.status_message.emit("Step Size has been retired. Use the global BPM control instead.")
return
scene = self._editable_scene_snapshot()
if not hasattr(scene.params, key):
self.status_message.emit(f"Unknown parameter ignored: {key}")
return
payload = {
field_name: getattr(scene.params, field_name)
for field_name in scene.params.__dataclass_fields__
}
try:
if key in NUMERIC_PARAMETER_KEYS:
payload[key] = float(value)
else:
payload[key] = value
params = PatternParameters.from_dict(payload)
except (TypeError, ValueError) as exc:
self.status_message.emit(f"Invalid value for {key}: {value!r} ({exc})")
return
if params == scene.params:
return
scene.params = params
self._replace_editable_scene(scene)
if not self.foh_mode_enabled:
self._sync_next_to_live()
self.state_changed.emit()
if not self.timer.isActive() or self.current_frame is None:
self.render_once(force_preview=True)
def set_selected_tile(self, tile_id: str | None) -> None:
self.selected_tile_id = tile_id
self.state_changed.emit()
def set_backend(self, backend_id: str) -> None:
self.output_manager.set_active_backend(backend_id)
self.state_changed.emit()
self.status_message.emit(f"Active backend: {self.output_manager.active_backend().display_name}")
def set_output_enabled(self, enabled: bool) -> None:
self.output_manager.set_output_enabled(enabled)
self.state_changed.emit()
state = "enabled" if enabled else "disabled"
self.status_message.emit(f"Hardware output {state}.")
def set_output_target_fps(self, value: float) -> None:
self.output_manager.set_target_fps(value)
self.state_changed.emit()
def set_utility_mode(self, utility_mode: str) -> None:
self.utility_mode = utility_mode
self.state_changed.emit()
self.render_once(force_preview=True)
def clear_utility_mode(self) -> None:
self.utility_mode = "none"
self.state_changed.emit()
self.render_once(force_preview=True)
def go_scene(self) -> None:
self.utility_mode = "none"
self._scene_transition = None
self._transition_engine = None
self._live_scene = self._next_scene.clone()
self.live_engine = PatternEngine()
if not self.foh_mode_enabled:
self._sync_next_to_live()
self.state_changed.emit()
self.render_once(force_preview=True)
self.status_message.emit("Go: next scene is now live.")
def fade_go(self, duration_s: float | None = None, timestamp: float | None = None) -> None:
duration = self.transition_duration_s if duration_s is None else max(0.0, min(60.0, float(duration_s)))
self.transition_duration_s = duration
if duration <= 0.0:
self.go_scene()
return
now = time.perf_counter() if timestamp is None else timestamp
if self.current_frame is None:
self.render_once(timestamp=now, force_preview=True)
source_frame = self.current_frame
if source_frame is None:
source_frame = self._render_scene_frame(self.live_engine, self._live_scene, now, utility_mode=self.utility_mode)
self.utility_mode = "none"
self._transition_engine = PatternEngine()
self._scene_transition = SceneTransition(
started_at=now,
duration_s=duration,
source_frame=source_frame,
target_scene=self._next_scene.clone(),
)
self.state_changed.emit()
self.render_once(timestamp=now, force_preview=True)
self.status_message.emit(f"Fade Go: {duration:.1f}s transition started.")
def available_patterns(self):
return self.live_engine.descriptors()
def available_presets(self) -> list[PresetRecord]:
return self.preset_store.list_presets()
def save_current_preset(self, name: str) -> None:
record = PresetRecord.create(name=name, pattern_id=self.pattern_id, params=self.params, tempo_bpm=self.tempo_bpm)
self.preset_store.save(record)
self.presets_changed.emit()
self.status_message.emit(f"Saved preset: {name}")
def apply_preset(self, preset_name: str) -> None:
record = self.preset_store.load(preset_name)
params = PatternParameters.from_dict(record.parameters)
pattern_id, normalized_params = self._normalize_pattern_request(record.pattern_id, params)
scene = self._editable_scene_snapshot()
scene.pattern_id = pattern_id
scene.params = normalized_params if normalized_params is not None else params
self._replace_editable_scene(scene)
preset_tempo_bpm: float | None = None
if record.tempo_bpm is not None:
preset_tempo_bpm = _clamp_tempo_bpm(record.tempo_bpm)
elif "speed" in record.parameters:
try:
preset_tempo_bpm = _legacy_speed_to_tempo_bpm(float(record.parameters["speed"]))
except (TypeError, ValueError):
pass
if preset_tempo_bpm is not None:
self._retime_transport(preset_tempo_bpm, self._transport_time_s)
if not self.foh_mode_enabled:
self.utility_mode = "none"
self._sync_next_to_live()
self.state_changed.emit()
self.render_once(force_preview=True)
self.status_message.emit(f"Loaded preset: {preset_name}")
def delete_preset(self, preset_name: str) -> None:
self.preset_store.delete(preset_name)
self.presets_changed.emit()
self.status_message.emit(f"Deleted preset: {preset_name}")
def render_once(self, timestamp: float | None = None, force_preview: bool = False) -> None:
if not self.config.tiles:
return
render_started_at = time.perf_counter()
now = time.perf_counter() if timestamp is None else timestamp
self._transport_time_s = float(now)
try:
if self._scene_transition is None:
live_frame = self._render_scene_frame(
self.live_engine,
self._live_scene,
now,
utility_mode=self.utility_mode,
)
else:
if self._transition_engine is None:
self._transition_engine = PatternEngine()
target_frame = self._render_scene_frame(self._transition_engine, self._scene_transition.target_scene, now)
elapsed = max(0.0, now - self._scene_transition.started_at)
alpha = 1.0 if self._scene_transition.duration_s <= 0.0 else min(1.0, elapsed / self._scene_transition.duration_s)
live_frame = blend_preview_frames(self._scene_transition.source_frame, target_frame, alpha)
if alpha >= 1.0:
self._live_scene = self._scene_transition.target_scene.clone()
self.live_engine = self._transition_engine
self._transition_engine = None
self._scene_transition = None
live_frame = target_frame
if not self.foh_mode_enabled:
self._sync_next_to_live()
self.state_changed.emit()
self.current_frame = live_frame
if self.foh_mode_enabled:
self.next_frame = self._render_scene_frame(self.next_engine, self._next_scene, now)
elif self._next_scene.pattern_id == self._live_scene.pattern_id and self._next_scene.params.to_dict() == self._live_scene.params.to_dict():
self.next_frame = live_frame
self.output_manager.submit_frame(live_frame)
self._emit_preview_frames(live_frame, self.next_frame if self.foh_mode_enabled else None, now, force_preview)
self._emit_output_status_messages()
except Exception as exc: # pragma: no cover - UI safety net
message = f"Render error: {exc}"
if message != self._last_output_message:
self._last_output_message = message
self.status_message.emit(message)
traceback.print_exc()
finally:
self._record_render_metrics(render_started_at, time.perf_counter())
def realtime_diagnostics(self) -> RealtimeDiagnostics:
output = self.output_manager.diagnostics_snapshot()
return RealtimeDiagnostics(
backend_id=output.backend_id,
backend_name=output.backend_name,
output_enabled=output.output_enabled,
worker_running=output.worker_running,
target_output_fps=output.target_fps,
render_fps=self._render_fps,
send_fps=output.send_fps,
last_render_time_ms=self._last_render_duration_s * 1000.0,
last_send_time_ms=output.last_send_time_ms,
frames_rendered=self._frames_rendered,
frames_submitted=output.frames_submitted,
frames_sent=output.frames_sent,
stale_frame_drops=output.stale_frame_drops,
send_failures=output.send_failures,
packets_last_frame=output.packets_last_frame,
devices_last_frame=output.devices_last_frame,
packets_sent_total=output.packets_sent_total,
last_output_message=output.last_message,
send_budget_misses=output.send_budget_misses,
last_schedule_slip_ms=output.last_schedule_slip_ms,
controller_fps=output.controller_fps,
controller_live_devices=output.controller_live_devices,
controller_sampled_devices=output.controller_sampled_devices,
controller_total_devices=output.controller_total_devices,
controller_source=output.controller_source,
)
def shutdown(self) -> None:
self.timer.stop()
self.output_manager.shutdown()
def safe_load_mapping(self, path: str | Path) -> tuple[bool, list[str]]:
try:
self.load_mapping(path)
return True, []
except MappingValidationError as exc:
return False, exc.errors
def _emit_output_status_messages(self) -> None:
for message in self.output_manager.drain_status_messages():
if message != self._last_output_message:
self._last_output_message = message
self.status_message.emit(message)
def _record_render_metrics(self, started_at: float, finished_at: float) -> None:
self._last_render_duration_s = max(0.0, finished_at - started_at)
self._frames_rendered += 1
if self._render_window_started_at <= 0.0:
self._render_window_started_at = finished_at
self._render_window_count = 1
self._render_fps = 0.0
return
self._render_window_count += 1
elapsed = finished_at - self._render_window_started_at
if elapsed >= 0.5:
self._render_fps = self._render_window_count / elapsed
self._render_window_started_at = finished_at
self._render_window_count = 0
def _emit_preview_frames(
self,
live_frame: PreviewFrame,
next_frame: PreviewFrame | None,
timestamp: float,
force_preview: bool,
) -> None:
if self._preview_emit_due(self._last_live_preview_emit_time_s, timestamp, force_preview):
self._last_live_preview_emit_time_s = timestamp
self.frame_ready.emit(live_frame)
if next_frame is not None and self._preview_emit_due(self._last_next_preview_emit_time_s, timestamp, force_preview):
self._last_next_preview_emit_time_s = timestamp
self.next_frame_ready.emit(next_frame)
def _preview_emit_due(self, last_emit_time_s: float | None, timestamp: float, force_preview: bool) -> bool:
if force_preview or last_emit_time_s is None:
return True
if float(timestamp) <= last_emit_time_s:
return True
return (float(timestamp) - last_emit_time_s) >= self._preview_emit_interval_s()
def _preview_emit_interval_s(self) -> float:
try:
backend = self.output_manager.active_backend()
live_output_active = self.output_manager.output_enabled and backend.supports_live_output
except Exception:
live_output_active = False
preview_fps = PRIORITIZED_PREVIEW_FPS if live_output_active else DEFAULT_PREVIEW_FPS
return 1.0 / max(1.0, preview_fps)