First upload, 18 controller version

This commit is contained in:
2026-04-14 15:23:56 +02:00
commit 8c55001a1c
3810 changed files with 764061 additions and 0 deletions

2
app/core/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""Pure Python core types and orchestration helpers."""

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

162
app/core/colors.py Normal file
View File

@@ -0,0 +1,162 @@
from __future__ import annotations
import colorsys
import math
from .types import RGBColor, clamp
PALETTES: dict[str, list[str]] = {
"Laser Club": ["#00F0FF", "#008CFF", "#6A00FF", "#060814"],
"Magenta Drive": ["#FF006E", "#FF4DA6", "#7A00FF", "#120318"],
"Warehouse Heat": ["#FF5A1F", "#FF9E00", "#FFD000", "#140600"],
"UV Riot": ["#7A00FF", "#B100FF", "#FF00A8", "#100014"],
"Redline": ["#FF2D55", "#FF6A00", "#FFB000", "#160406"],
"Sodium Haze": ["#FF7A00", "#FFB000", "#FFD86B", "#120700"],
"Afterhours": ["#F72585", "#B5179E", "#7209B7", "#14031A"],
"Voltage": ["#00E5FF", "#00B3FF", "#3A86FF", "#050A14"],
}
PALETTE_ALIASES: dict[str, str] = {
"Aurora": "Laser Club",
"Cinder": "Warehouse Heat",
"Sapphire": "Voltage",
"Deep Blue": "Voltage",
"Neon Tide": "Laser Club",
"Sunset Drive": "Redline",
"Moss Signal": "Sodium Haze",
"Polar": "Voltage",
"Signal Warm": "Warehouse Heat",
"Steel Bloom": "Magenta Drive",
}
DEFAULT_PALETTE = "Laser Club"
RANDOM_EFFECT_COLORS: tuple[RGBColor, ...] = (
RGBColor(1.0, 0.12, 0.12),
RGBColor(1.0, 0.42, 0.0),
RGBColor(1.0, 0.74, 0.0),
RGBColor(0.7, 1.0, 0.0),
RGBColor(0.0, 1.0, 0.3),
RGBColor(0.0, 0.86, 1.0),
RGBColor(0.0, 0.32, 1.0),
RGBColor(0.7, 0.0, 1.0),
)
def hex_to_color(value: str) -> RGBColor:
text = value.strip().lstrip("#")
if len(text) != 6:
return RGBColor.white()
try:
return RGBColor(*(int(text[index : index + 2], 16) / 255.0 for index in (0, 2, 4)))
except ValueError:
return RGBColor.white()
def canonical_palette_name(name: str) -> str:
candidate = PALETTE_ALIASES.get(name, name)
return candidate if candidate in PALETTES else DEFAULT_PALETTE
def palette_colors(name: str) -> list[RGBColor]:
raw = PALETTES[canonical_palette_name(name)]
return [hex_to_color(value) for value in raw]
def sample_palette(name: str, amount: float) -> RGBColor:
colors = palette_colors(name)
if len(colors) == 1:
return colors[0]
amount = clamp(amount)
scaled = amount * (len(colors) - 1)
index = int(math.floor(scaled))
next_index = min(len(colors) - 1, index + 1)
local = scaled - index
return colors[index].mix(colors[next_index], local)
def sample_random_effect_color(amount: float) -> RGBColor:
return sample_color_choices(RANDOM_EFFECT_COLORS, amount)
def sample_color_choices(colors: tuple[RGBColor, ...], amount: float) -> RGBColor:
amount = clamp(amount)
if not colors:
return RGBColor.white()
index = int(math.floor(amount * len(colors))) % len(colors)
return colors[index]
def custom_random_color_choices(primary_hex: str, secondary_hex: str) -> tuple[RGBColor, ...]:
primary = hex_to_color(primary_hex)
secondary = hex_to_color(secondary_hex)
if primary.to_8bit_tuple() == secondary.to_8bit_tuple():
return (primary,)
return (
primary,
primary.mix(secondary, 0.25),
primary.mix(secondary, 0.5),
primary.mix(secondary, 0.75),
secondary,
)
def sample_custom_random_color(primary_hex: str, secondary_hex: str, amount: float) -> RGBColor:
return sample_color_choices(custom_random_color_choices(primary_hex, secondary_hex), amount)
def smoothstep(edge0: float, edge1: float, value: float) -> float:
if edge0 == edge1:
return 0.0
amount = clamp((value - edge0) / (edge1 - edge0))
return amount * amount * (3.0 - 2.0 * amount)
def ease_in_out_sine(value: float) -> float:
return -(math.cos(math.pi * clamp(value)) - 1.0) / 2.0
def oscillate(time_s: float, speed: float = 1.0, phase: float = 0.0) -> float:
return 0.5 + 0.5 * math.sin(time_s * speed * math.tau + phase)
def brighten(color: RGBColor, amount: float) -> RGBColor:
return color.mix(RGBColor.white(), amount)
def darken(color: RGBColor, amount: float) -> RGBColor:
return color.mix(RGBColor.black(), amount)
def choose_pair(color_mode: str, primary_hex: str, secondary_hex: str, palette_name: str, amount: float) -> tuple[RGBColor, RGBColor]:
primary = hex_to_color(primary_hex)
secondary = hex_to_color(secondary_hex)
if color_mode == "palette":
primary = sample_palette(palette_name, amount)
secondary = sample_palette(palette_name, (amount + 0.38) % 1.0)
elif color_mode == "random_colors":
primary = sample_random_effect_color(amount)
secondary = darken(primary, 0.78)
elif color_mode == "custom_random":
primary = sample_custom_random_color(primary_hex, secondary_hex, amount)
secondary = darken(primary, 0.78)
elif color_mode == "mono":
# Mono should be a true single-color-on-black look so stepped patterns can
# reach an actual off state instead of hovering at a dim gray floor.
secondary = RGBColor.black()
elif color_mode == "complementary":
hue, lightness, saturation = colorsys.rgb_to_hls(primary.r, primary.g, primary.b)
red, green, blue = colorsys.hls_to_rgb((hue + 0.5) % 1.0, lightness, saturation)
secondary = RGBColor(red, green, blue)
return primary, secondary
def relative_luminance(color: RGBColor) -> float:
return 0.2126 * color.r + 0.7152 * color.g + 0.0722 * color.b
def label_contrast(color: RGBColor) -> RGBColor:
return RGBColor.black() if relative_luminance(color) > 0.6 else RGBColor.white()

609
app/core/controller.py Normal file
View File

@@ -0,0 +1,609 @@
from __future__ import annotations
from pathlib import Path
import time
import traceback
from app.qt_compat import QObject, QTimer, Qt, Signal
from app.config.models import InfinityMirrorConfig
from app.config.xml_mapping import MappingValidationError, load_config, save_config
from app.core.diagnostics import RealtimeDiagnostics
from app.core.pattern_engine import PatternEngine
from app.core.pattern_compat import normalize_pattern_request
from app.core.presets import PresetRecord, PresetStore
from app.core.types import PatternParameters, PreviewFrame, SceneState, SceneTransition, blend_preview_frames
from app.output.manager import OutputManager
NUMERIC_PARAMETER_KEYS = {
"brightness",
"fade",
"tempo_multiplier",
"angle",
"on_width",
"off_width",
"block_size",
"pixel_group_size",
"strobe_duty_cycle",
"randomness",
}
DEFAULT_TEMPO_BPM = 120.0
MIN_TEMPO_BPM = 10.0
MAX_TEMPO_BPM = 300.0
DEFAULT_RENDER_FPS = 60.0
DEFAULT_PREVIEW_FPS = 30.0
PRIORITIZED_PREVIEW_FPS = 12.0
def _clamp_tempo_bpm(value: float) -> float:
return max(MIN_TEMPO_BPM, min(MAX_TEMPO_BPM, float(value)))
def _legacy_speed_to_tempo_bpm(value: float) -> float:
return _clamp_tempo_bpm(float(value) * 60.0)
class InfinityMirrorController(QObject):
frame_ready = Signal(object)
next_frame_ready = Signal(object)
state_changed = Signal()
config_changed = Signal()
presets_changed = Signal()
status_message = Signal(str)
def __init__(self, project_root: str | Path, output_manager: OutputManager | None = None) -> None:
super().__init__()
self.project_root = Path(project_root)
self.output_manager = output_manager if output_manager is not None else OutputManager()
self.preset_store = PresetStore(self.project_root / "presets")
self.preset_store.ensure_seed_presets()
self.config = InfinityMirrorConfig()
self.mapping_path: Path | None = None
self._live_scene = SceneState()
self._next_scene = self._live_scene.clone()
self.foh_mode_enabled = False
self.tempo_bpm = DEFAULT_TEMPO_BPM
self._tempo_anchor_time_s = 0.0
self._tempo_anchor_phase = 0.0
self._transport_time_s = 0.0
self.transition_duration_s = 2.0
self._scene_transition: SceneTransition | None = None
self.live_engine = PatternEngine()
self.next_engine = PatternEngine()
self._transition_engine: PatternEngine | None = None
self.utility_mode = "none"
self.selected_tile_id: str | None = None
self.current_frame: PreviewFrame | None = None
self.next_frame: PreviewFrame | None = None
self._last_output_message = ""
self._frames_rendered = 0
self._last_render_duration_s = 0.0
self._render_window_started_at = 0.0
self._render_window_count = 0
self._render_fps = 0.0
self._last_live_preview_emit_time_s = None
self._last_next_preview_emit_time_s = None
self._last_live_preview_emit_time_s: float | None = None
self._last_next_preview_emit_time_s: float | None = None
self.output_manager.update_config(self.config)
self.timer = QTimer(self)
self.timer.setTimerType(Qt.PreciseTimer)
self.timer.setInterval(max(1, int(round(1000.0 / DEFAULT_RENDER_FPS))))
self.timer.timeout.connect(self.render_once)
self.timer.start()
@property
def pattern_id(self) -> str:
return self._editable_scene().pattern_id
@pattern_id.setter
def pattern_id(self, value: str) -> None:
self.set_pattern(value)
@property
def params(self) -> PatternParameters:
return self._editable_scene().params.clone()
@params.setter
def params(self, value: PatternParameters) -> None:
self.set_params(value)
@property
def transition_active(self) -> bool:
return self._scene_transition is not None
def live_scene(self) -> SceneState:
return self.scene_state("live")
def next_scene(self) -> SceneState:
return self.scene_state("next")
def scene_state(self, role: str = "live") -> SceneState:
return self._scene_for_role(role).clone()
def preview_frame_for(self, role: str = "live") -> PreviewFrame | None:
return self.next_frame if str(role).strip().lower() == "next" else self.current_frame
def startup_mapping_candidates(self) -> list[Path]:
return [
self.project_root / "sample_data" / "infinity_mirror_mapping_clean.xml",
self.project_root / "infinity_mirror_mapping_clean.xml",
]
def load_initial_config(self) -> None:
for candidate in self.startup_mapping_candidates():
if candidate.exists():
self.load_mapping(candidate)
return
self.status_message.emit("No default mapping found. Use File > Open Mapping.")
def load_mapping(self, path: str | Path) -> None:
config = load_config(path)
self.config = config
self.mapping_path = Path(path)
if self.selected_tile_id not in self.config.tile_lookup():
first_tile = self.config.sorted_tiles()[0] if self.config.tiles else None
self.selected_tile_id = first_tile.tile_id if first_tile else None
self._reset_render_state()
self.output_manager.update_config(self.config)
self.config_changed.emit()
self.state_changed.emit()
self.status_message.emit(f"Loaded mapping: {Path(path).name}")
self.render_once(force_preview=True)
def save_mapping(self, path: str | Path | None = None) -> None:
target = Path(path) if path is not None else self.mapping_path
if target is None:
raise ValueError("No mapping path set.")
save_config(self.config, target)
self.mapping_path = target
self.status_message.emit(f"Saved mapping: {target.name}")
def replace_config(self, config: InfinityMirrorConfig, path: str | Path | None = None) -> None:
self.config = config
if path is not None:
self.mapping_path = Path(path)
if self.selected_tile_id not in self.config.tile_lookup():
first_tile = self.config.sorted_tiles()[0] if self.config.tiles else None
self.selected_tile_id = first_tile.tile_id if first_tile else None
self._reset_render_state()
self.output_manager.update_config(self.config)
self.config_changed.emit()
self.state_changed.emit()
self.render_once(force_preview=True)
def _reset_render_state(self) -> None:
self.live_engine = PatternEngine()
self.next_engine = PatternEngine()
self._transition_engine = None
self._scene_transition = None
self.current_frame = None
self.next_frame = None
self._last_output_message = ""
self._frames_rendered = 0
self._last_render_duration_s = 0.0
self._render_window_started_at = 0.0
self._render_window_count = 0
self._render_fps = 0.0
def _editable_scene(self) -> SceneState:
return self._scene_for_role(self._editable_scene_role())
def _editable_scene_role(self) -> str:
return "next" if self.foh_mode_enabled else "live"
def _normalize_scene_role(self, role: str = "live") -> str:
return "next" if str(role).strip().lower() == "next" else "live"
def _scene_for_role(self, role: str = "live") -> SceneState:
return self._next_scene if self._normalize_scene_role(role) == "next" else self._live_scene
def _replace_scene(self, role: str, scene: SceneState) -> None:
cloned_scene = scene.clone()
if self._normalize_scene_role(role) == "next":
self._next_scene = cloned_scene
else:
self._live_scene = cloned_scene
def _editable_scene_snapshot(self) -> SceneState:
return self._editable_scene().clone()
def _replace_editable_scene(self, scene: SceneState) -> None:
self._replace_scene(self._editable_scene_role(), scene)
def _sync_next_to_live(self) -> None:
self._next_scene = self._live_scene.clone()
self.next_engine = PatternEngine()
self.next_frame = None
def _normalize_pattern_request(self, pattern_id: str, params: PatternParameters | None = None) -> tuple[str, PatternParameters | None]:
return normalize_pattern_request(
pattern_id,
params,
rows=self.config.logical_display.rows,
cols=self.config.logical_display.cols,
)
def _render_scene_frame(
self,
engine: PatternEngine,
scene: SceneState,
timestamp: float,
utility_mode: str = "none",
) -> PreviewFrame:
tempo_phase = self._tempo_phase_at(timestamp)
return engine.render_frame(
config=self.config,
pattern_id=scene.pattern_id,
params=scene.params,
utility_mode=utility_mode,
selected_tile_id=self.selected_tile_id,
timestamp=timestamp,
tempo_bpm=self.tempo_bpm,
tempo_phase=tempo_phase,
)
def _tempo_phase_at(self, timestamp: float) -> float:
return self._tempo_anchor_phase + (float(timestamp) - self._tempo_anchor_time_s) * (self.tempo_bpm / 60.0)
def _retime_transport(self, tempo_bpm: float, timestamp: float) -> None:
current_phase = self._tempo_phase_at(timestamp)
self.tempo_bpm = tempo_bpm
self._tempo_anchor_time_s = float(timestamp)
self._tempo_anchor_phase = current_phase
self._transport_time_s = float(timestamp)
def set_tempo_bpm(self, value: float, timestamp: float | None = None) -> None:
tempo_bpm = _clamp_tempo_bpm(float(value))
if abs(tempo_bpm - self.tempo_bpm) < 1e-9:
return
anchor_time = self._transport_time_s if timestamp is None else float(timestamp)
self._retime_transport(tempo_bpm, anchor_time)
self.state_changed.emit()
self.render_once(timestamp=anchor_time if timestamp is not None else None, force_preview=True)
def set_foh_mode(self, enabled: bool) -> None:
enabled = bool(enabled)
if self.foh_mode_enabled == enabled:
return
self.foh_mode_enabled = enabled
if not enabled:
if self._scene_transition is None:
self._sync_next_to_live()
self.state_changed.emit()
self.render_once(force_preview=True)
def set_transition_duration(self, duration_s: float) -> None:
duration = max(0.0, min(60.0, float(duration_s)))
if abs(duration - self.transition_duration_s) < 1e-9:
return
self.transition_duration_s = duration
self.state_changed.emit()
def set_pattern(self, pattern_id: str) -> None:
scene = self._editable_scene_snapshot()
pattern_id, params = self._normalize_pattern_request(pattern_id, scene.params)
scene.pattern_id = pattern_id
if params is not None:
scene.params = params
self._replace_editable_scene(scene)
if not self.foh_mode_enabled:
self.utility_mode = "none"
self._sync_next_to_live()
self.state_changed.emit()
self.render_once(force_preview=True)
def set_params(self, value: PatternParameters) -> None:
scene = self._editable_scene_snapshot()
params = value.clone().sanitized()
if params == scene.params:
return
scene.params = params
self._replace_editable_scene(scene)
if not self.foh_mode_enabled:
self._sync_next_to_live()
self.state_changed.emit()
self.render_once(force_preview=True)
def set_parameter(self, key: str, value) -> None:
if key == "speed":
try:
self.set_tempo_bpm(_legacy_speed_to_tempo_bpm(float(value)))
except (TypeError, ValueError) as exc:
self.status_message.emit(f"Invalid value for {key}: {value!r} ({exc})")
return
if key == "step_size":
self.status_message.emit("Step Size has been retired. Use the global BPM control instead.")
return
scene = self._editable_scene_snapshot()
if not hasattr(scene.params, key):
self.status_message.emit(f"Unknown parameter ignored: {key}")
return
payload = {
field_name: getattr(scene.params, field_name)
for field_name in scene.params.__dataclass_fields__
}
try:
if key in NUMERIC_PARAMETER_KEYS:
payload[key] = float(value)
else:
payload[key] = value
params = PatternParameters.from_dict(payload)
except (TypeError, ValueError) as exc:
self.status_message.emit(f"Invalid value for {key}: {value!r} ({exc})")
return
if params == scene.params:
return
scene.params = params
self._replace_editable_scene(scene)
if not self.foh_mode_enabled:
self._sync_next_to_live()
self.state_changed.emit()
if not self.timer.isActive() or self.current_frame is None:
self.render_once(force_preview=True)
def set_selected_tile(self, tile_id: str | None) -> None:
self.selected_tile_id = tile_id
self.state_changed.emit()
def set_backend(self, backend_id: str) -> None:
self.output_manager.set_active_backend(backend_id)
self.state_changed.emit()
self.status_message.emit(f"Active backend: {self.output_manager.active_backend().display_name}")
def set_output_enabled(self, enabled: bool) -> None:
self.output_manager.set_output_enabled(enabled)
self.state_changed.emit()
state = "enabled" if enabled else "disabled"
self.status_message.emit(f"Hardware output {state}.")
def set_output_target_fps(self, value: float) -> None:
self.output_manager.set_target_fps(value)
self.state_changed.emit()
def set_utility_mode(self, utility_mode: str) -> None:
self.utility_mode = utility_mode
self.state_changed.emit()
self.render_once(force_preview=True)
def clear_utility_mode(self) -> None:
self.utility_mode = "none"
self.state_changed.emit()
self.render_once(force_preview=True)
def go_scene(self) -> None:
self.utility_mode = "none"
self._scene_transition = None
self._transition_engine = None
self._live_scene = self._next_scene.clone()
self.live_engine = PatternEngine()
if not self.foh_mode_enabled:
self._sync_next_to_live()
self.state_changed.emit()
self.render_once(force_preview=True)
self.status_message.emit("Go: next scene is now live.")
def fade_go(self, duration_s: float | None = None, timestamp: float | None = None) -> None:
duration = self.transition_duration_s if duration_s is None else max(0.0, min(60.0, float(duration_s)))
self.transition_duration_s = duration
if duration <= 0.0:
self.go_scene()
return
now = time.perf_counter() if timestamp is None else timestamp
if self.current_frame is None:
self.render_once(timestamp=now, force_preview=True)
source_frame = self.current_frame
if source_frame is None:
source_frame = self._render_scene_frame(self.live_engine, self._live_scene, now, utility_mode=self.utility_mode)
self.utility_mode = "none"
self._transition_engine = PatternEngine()
self._scene_transition = SceneTransition(
started_at=now,
duration_s=duration,
source_frame=source_frame,
target_scene=self._next_scene.clone(),
)
self.state_changed.emit()
self.render_once(timestamp=now, force_preview=True)
self.status_message.emit(f"Fade Go: {duration:.1f}s transition started.")
def available_patterns(self):
return self.live_engine.descriptors()
def available_presets(self) -> list[PresetRecord]:
return self.preset_store.list_presets()
def save_current_preset(self, name: str) -> None:
record = PresetRecord.create(name=name, pattern_id=self.pattern_id, params=self.params, tempo_bpm=self.tempo_bpm)
self.preset_store.save(record)
self.presets_changed.emit()
self.status_message.emit(f"Saved preset: {name}")
def apply_preset(self, preset_name: str) -> None:
record = self.preset_store.load(preset_name)
params = PatternParameters.from_dict(record.parameters)
pattern_id, normalized_params = self._normalize_pattern_request(record.pattern_id, params)
scene = self._editable_scene_snapshot()
scene.pattern_id = pattern_id
scene.params = normalized_params if normalized_params is not None else params
self._replace_editable_scene(scene)
preset_tempo_bpm: float | None = None
if record.tempo_bpm is not None:
preset_tempo_bpm = _clamp_tempo_bpm(record.tempo_bpm)
elif "speed" in record.parameters:
try:
preset_tempo_bpm = _legacy_speed_to_tempo_bpm(float(record.parameters["speed"]))
except (TypeError, ValueError):
pass
if preset_tempo_bpm is not None:
self._retime_transport(preset_tempo_bpm, self._transport_time_s)
if not self.foh_mode_enabled:
self.utility_mode = "none"
self._sync_next_to_live()
self.state_changed.emit()
self.render_once(force_preview=True)
self.status_message.emit(f"Loaded preset: {preset_name}")
def delete_preset(self, preset_name: str) -> None:
self.preset_store.delete(preset_name)
self.presets_changed.emit()
self.status_message.emit(f"Deleted preset: {preset_name}")
def render_once(self, timestamp: float | None = None, force_preview: bool = False) -> None:
if not self.config.tiles:
return
render_started_at = time.perf_counter()
now = time.perf_counter() if timestamp is None else timestamp
self._transport_time_s = float(now)
try:
if self._scene_transition is None:
live_frame = self._render_scene_frame(
self.live_engine,
self._live_scene,
now,
utility_mode=self.utility_mode,
)
else:
if self._transition_engine is None:
self._transition_engine = PatternEngine()
target_frame = self._render_scene_frame(self._transition_engine, self._scene_transition.target_scene, now)
elapsed = max(0.0, now - self._scene_transition.started_at)
alpha = 1.0 if self._scene_transition.duration_s <= 0.0 else min(1.0, elapsed / self._scene_transition.duration_s)
live_frame = blend_preview_frames(self._scene_transition.source_frame, target_frame, alpha)
if alpha >= 1.0:
self._live_scene = self._scene_transition.target_scene.clone()
self.live_engine = self._transition_engine
self._transition_engine = None
self._scene_transition = None
live_frame = target_frame
if not self.foh_mode_enabled:
self._sync_next_to_live()
self.state_changed.emit()
self.current_frame = live_frame
if self.foh_mode_enabled:
self.next_frame = self._render_scene_frame(self.next_engine, self._next_scene, now)
elif self._next_scene.pattern_id == self._live_scene.pattern_id and self._next_scene.params.to_dict() == self._live_scene.params.to_dict():
self.next_frame = live_frame
self.output_manager.submit_frame(live_frame)
self._emit_preview_frames(live_frame, self.next_frame if self.foh_mode_enabled else None, now, force_preview)
self._emit_output_status_messages()
except Exception as exc: # pragma: no cover - UI safety net
message = f"Render error: {exc}"
if message != self._last_output_message:
self._last_output_message = message
self.status_message.emit(message)
traceback.print_exc()
finally:
self._record_render_metrics(render_started_at, time.perf_counter())
def realtime_diagnostics(self) -> RealtimeDiagnostics:
output = self.output_manager.diagnostics_snapshot()
return RealtimeDiagnostics(
backend_id=output.backend_id,
backend_name=output.backend_name,
output_enabled=output.output_enabled,
worker_running=output.worker_running,
target_output_fps=output.target_fps,
render_fps=self._render_fps,
send_fps=output.send_fps,
last_render_time_ms=self._last_render_duration_s * 1000.0,
last_send_time_ms=output.last_send_time_ms,
frames_rendered=self._frames_rendered,
frames_submitted=output.frames_submitted,
frames_sent=output.frames_sent,
stale_frame_drops=output.stale_frame_drops,
send_failures=output.send_failures,
packets_last_frame=output.packets_last_frame,
devices_last_frame=output.devices_last_frame,
packets_sent_total=output.packets_sent_total,
last_output_message=output.last_message,
send_budget_misses=output.send_budget_misses,
last_schedule_slip_ms=output.last_schedule_slip_ms,
controller_fps=output.controller_fps,
controller_live_devices=output.controller_live_devices,
controller_sampled_devices=output.controller_sampled_devices,
controller_total_devices=output.controller_total_devices,
controller_source=output.controller_source,
)
def shutdown(self) -> None:
self.timer.stop()
self.output_manager.shutdown()
def safe_load_mapping(self, path: str | Path) -> tuple[bool, list[str]]:
try:
self.load_mapping(path)
return True, []
except MappingValidationError as exc:
return False, exc.errors
def _emit_output_status_messages(self) -> None:
for message in self.output_manager.drain_status_messages():
if message != self._last_output_message:
self._last_output_message = message
self.status_message.emit(message)
def _record_render_metrics(self, started_at: float, finished_at: float) -> None:
self._last_render_duration_s = max(0.0, finished_at - started_at)
self._frames_rendered += 1
if self._render_window_started_at <= 0.0:
self._render_window_started_at = finished_at
self._render_window_count = 1
self._render_fps = 0.0
return
self._render_window_count += 1
elapsed = finished_at - self._render_window_started_at
if elapsed >= 0.5:
self._render_fps = self._render_window_count / elapsed
self._render_window_started_at = finished_at
self._render_window_count = 0
def _emit_preview_frames(
self,
live_frame: PreviewFrame,
next_frame: PreviewFrame | None,
timestamp: float,
force_preview: bool,
) -> None:
if self._preview_emit_due(self._last_live_preview_emit_time_s, timestamp, force_preview):
self._last_live_preview_emit_time_s = timestamp
self.frame_ready.emit(live_frame)
if next_frame is not None and self._preview_emit_due(self._last_next_preview_emit_time_s, timestamp, force_preview):
self._last_next_preview_emit_time_s = timestamp
self.next_frame_ready.emit(next_frame)
def _preview_emit_due(self, last_emit_time_s: float | None, timestamp: float, force_preview: bool) -> bool:
if force_preview or last_emit_time_s is None:
return True
if float(timestamp) <= last_emit_time_s:
return True
return (float(timestamp) - last_emit_time_s) >= self._preview_emit_interval_s()
def _preview_emit_interval_s(self) -> float:
try:
backend = self.output_manager.active_backend()
live_output_active = self.output_manager.output_enabled and backend.supports_live_output
except Exception:
live_output_active = False
preview_fps = PRIORITIZED_PREVIEW_FPS if live_output_active else DEFAULT_PREVIEW_FPS
return 1.0 / max(1.0, preview_fps)

32
app/core/diagnostics.py Normal file
View File

@@ -0,0 +1,32 @@
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class RealtimeDiagnostics:
backend_id: str
backend_name: str
output_enabled: bool
worker_running: bool
target_output_fps: float
render_fps: float
send_fps: float
last_render_time_ms: float
last_send_time_ms: float
frames_rendered: int
frames_submitted: int
frames_sent: int
stale_frame_drops: int
send_failures: int
packets_last_frame: int
devices_last_frame: int
packets_sent_total: int
last_output_message: str = ""
send_budget_misses: int = 0
last_schedule_slip_ms: float = 0.0
controller_fps: float | None = None
controller_live_devices: int = 0
controller_sampled_devices: int = 0
controller_total_devices: int = 0
controller_source: str = ""

89
app/core/geometry.py Normal file
View File

@@ -0,0 +1,89 @@
from __future__ import annotations
from app.config.models import SegmentConfig, TileConfig
from app.core.types import clamp
NormalizedPoint = tuple[float, float]
NormalizedInsets = tuple[float, float]
_SEGMENT_SIDE_ALIASES = {
"left": "left",
"right": "right",
"top": "top",
"bottom": "bottom",
"l": "left",
"r": "right",
"t": "top",
"b": "bottom",
}
def segment_side(tile: TileConfig, segment: SegmentConfig) -> str | None:
side = (segment.side or "").strip().lower()
if side in _SEGMENT_SIDE_ALIASES:
return _SEGMENT_SIDE_ALIASES[side]
width = max(0.001, tile.x1 - tile.x0)
height = max(0.001, tile.y1 - tile.y0)
delta_x = abs(segment.x1 - segment.x0) / width
delta_y = abs(segment.y1 - segment.y0) / height
mid_x = (segment.x0 + segment.x1) / 2.0
mid_y = (segment.y0 + segment.y1) / 2.0
if delta_x >= delta_y:
return "top" if mid_y <= tile.y0 + height * 0.5 else "bottom"
return "left" if mid_x <= tile.x0 + width * 0.5 else "right"
def segment_led_position(
tile: TileConfig,
segment: SegmentConfig,
amount: float,
*,
insets: NormalizedInsets = (0.0, 0.0),
apply_reverse: bool = False,
) -> NormalizedPoint:
amount = clamp(float(amount))
if apply_reverse and segment.reverse:
amount = 1.0 - amount
inset_x = clamp(float(insets[0]), 0.0, 0.49)
inset_y = clamp(float(insets[1]), 0.0, 0.49)
side = segment_side(tile, segment)
if side == "left":
return inset_x, inset_y + (1.0 - inset_y * 2.0) * amount
if side == "right":
return 1.0 - inset_x, inset_y + (1.0 - inset_y * 2.0) * amount
if side == "top":
return inset_x + (1.0 - inset_x * 2.0) * amount, inset_y
if side == "bottom":
return inset_x + (1.0 - inset_x * 2.0) * amount, 1.0 - inset_y
width = max(0.001, tile.x1 - tile.x0)
height = max(0.001, tile.y1 - tile.y0)
x_pos = segment.x0 + (segment.x1 - segment.x0) * amount
y_pos = segment.y0 + (segment.y1 - segment.y0) * amount
return (
clamp((x_pos - tile.x0) / width),
clamp((y_pos - tile.y0) / height),
)
def segment_led_positions(
tile: TileConfig,
segment: SegmentConfig,
*,
insets: NormalizedInsets = (0.0, 0.0),
) -> list[NormalizedPoint]:
count = max(1, segment.led_count)
return [
segment_led_position(
tile,
segment,
0.0 if count == 1 else index / (count - 1),
insets=insets,
apply_reverse=True,
)
for index in range(count)
]

125
app/core/pattern_compat.py Normal file
View File

@@ -0,0 +1,125 @@
from __future__ import annotations
from app.core.types import PatternParameters, clamp
SCAN_ANGLES: tuple[int, ...] = (0, 45, 90, 135, 180, 225, 270, 315)
_LEGACY_SCAN_IDS = {
"row_chase",
"row_scan",
"column_chase",
"column_scan",
"diagonal_scan",
}
_DEFAULT_SCAN_PARAMS = PatternParameters()
def nearest_scan_angle(value: float) -> int:
angle = int(round(float(value))) % 360
return min(SCAN_ANGLES, key=lambda candidate: min((candidate - angle) % 360, (angle - candidate) % 360))
def coerce_bool(value: object, default: bool = False) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
return default
def _flip_angle_horizontal(angle: int) -> int:
return (180 - angle) % 360
def _flip_angle_vertical(angle: int) -> int:
return (-angle) % 360
def normalize_pattern_request(
pattern_id: str,
params: PatternParameters | None = None,
rows: int | None = None,
cols: int | None = None,
) -> tuple[str, PatternParameters | None]:
if pattern_id == "pixel_sparkle":
if params is None:
return "strobe", None
payload = params.to_dict()
payload["strobe_mode"] = "random_pixels"
return "strobe", PatternParameters.from_dict(payload)
if pattern_id == "random_blocks":
return "sparkle", params
if pattern_id == "scan" and params is not None:
payload = params.to_dict()
angle = nearest_scan_angle(getattr(params, "angle", payload.get("angle", 0.0)))
if coerce_bool(getattr(params, "flip_horizontal", False)):
angle = _flip_angle_horizontal(angle)
if coerce_bool(getattr(params, "flip_vertical", False)):
angle = _flip_angle_vertical(angle)
payload["angle"] = angle
payload.pop("band_thickness", None)
payload.pop("flip_horizontal", None)
payload.pop("flip_vertical", None)
return "scan", PatternParameters.from_dict(payload)
if pattern_id not in _LEGACY_SCAN_IDS:
return pattern_id, params
payload = params.to_dict() if params is not None else {}
if params is not None:
payload["flip_horizontal"] = getattr(params, "flip_horizontal", payload.get("flip_horizontal", False))
payload["flip_vertical"] = getattr(params, "flip_vertical", payload.get("flip_vertical", False))
payload["band_thickness"] = getattr(params, "band_thickness", payload.get("band_thickness", params.on_width))
direction = str(payload.get("direction", "left_to_right"))
diagonal_mode = str(payload.get("diagonal_scan_mode", "line"))
def preferred_value(key: str, legacy_value):
current_value = payload.get(key, getattr(_DEFAULT_SCAN_PARAMS, key))
default_value = getattr(_DEFAULT_SCAN_PARAMS, key)
return legacy_value if current_value == default_value else current_value
if pattern_id in {"row_chase", "row_scan"}:
angle = 90 if direction != "bottom_to_top" else 270
on_width = float(preferred_value("on_width", payload.get("block_size", 1.0)))
off_width = float(preferred_value("off_width", 0.0))
scan_style = str(preferred_value("scan_style", "line"))
elif pattern_id in {"column_chase", "column_scan"}:
angle = 0 if direction != "right_to_left" else 180
on_width = float(preferred_value("on_width", payload.get("block_size", 1.0)))
off_width = float(preferred_value("off_width", 0.0))
scan_style = str(preferred_value("scan_style", "line"))
else:
angle = {
"left_to_right": 315,
"right_to_left": 135,
"top_to_bottom": 315,
"bottom_to_top": 135,
}.get(direction, 315)
scan_style = "bands" if diagonal_mode == "bands" else "line"
scan_style = str(preferred_value("scan_style", scan_style))
on_width = float(preferred_value("on_width", 2.0 if scan_style == "bands" else 0.5))
off_width = float(preferred_value("off_width", 1.5 if scan_style == "bands" else 0.0))
angle = nearest_scan_angle(preferred_value("angle", angle))
if coerce_bool(preferred_value("flip_horizontal", False)):
angle = _flip_angle_horizontal(angle)
if coerce_bool(preferred_value("flip_vertical", False)):
angle = _flip_angle_vertical(angle)
payload["angle"] = angle
payload["scan_style"] = str(scan_style)
payload["on_width"] = clamp(float(on_width), 0.1, 2.0)
payload["off_width"] = clamp(float(off_width), 0.0, 2.0)
payload.pop("band_thickness", None)
payload.pop("flip_horizontal", None)
payload.pop("flip_vertical", None)
return "scan", PatternParameters.from_dict(payload)

324
app/core/pattern_engine.py Normal file
View File

@@ -0,0 +1,324 @@
from __future__ import annotations
import math
import time
from typing import cast
from app.config.models import InfinityMirrorConfig, SegmentConfig, TileConfig
from app.core.colors import choose_pair, darken, label_contrast, sample_palette
from app.core.geometry import segment_led_positions
from app.core.pattern_compat import normalize_pattern_request
from app.core.types import PatternParameters, PreviewFrame, RGBColor, TileFrame, TilePatternSample
from app.patterns.base import PatternContext, PatternDescriptor, PatternRegistry
from app.patterns.builtin import built_in_patterns
class PatternEngine:
def __init__(self) -> None:
self.registry = PatternRegistry(built_in_patterns())
self._last_time = time.perf_counter()
self._previous_samples: dict[str, TilePatternSample] = {}
def descriptors(self) -> list[PatternDescriptor]:
return self.registry.descriptors()
def render_frame(
self,
config: InfinityMirrorConfig,
pattern_id: str,
params: PatternParameters,
utility_mode: str = "none",
selected_tile_id: str | None = None,
timestamp: float | None = None,
tempo_bpm: float = 60.0,
tempo_phase: float | None = None,
) -> PreviewFrame:
params = params.sanitized()
pattern_id, normalized_params = normalize_pattern_request(
pattern_id,
params,
rows=config.logical_display.rows,
cols=config.logical_display.cols,
)
if normalized_params is not None:
params = normalized_params
timestamp = time.perf_counter() if timestamp is None else timestamp
if tempo_phase is None:
tempo_phase = timestamp * max(0.05, float(tempo_bpm) / 60.0)
delta = max(1.0 / 120.0, timestamp - self._last_time)
self._last_time = timestamp
if utility_mode != "none":
temporal_profile = "direct"
samples = self._render_utility_frame(config, params, utility_mode, selected_tile_id, timestamp)
else:
descriptor = self.registry.get(pattern_id).descriptor
temporal_profile = descriptor.temporal_profile
samples = self.registry.get(pattern_id).render(
PatternContext(
config=config,
params=params,
time_s=timestamp,
tempo_bpm=tempo_bpm,
tempo_phase=tempo_phase,
)
)
blend_alpha = 1.0 - math.exp(-delta * (10.0 * (1.0 - params.fade) + 1.0))
frame_tiles: dict[str, TileFrame] = {}
for tile in config.sorted_tiles():
sample = samples.get(tile.tile_id, self._fallback_sample(tile))
blended = sample if temporal_profile == "direct" else self._blend_sample(tile.tile_id, sample, blend_alpha)
self._previous_samples[tile.tile_id] = blended
brightness = params.brightness * tile.brightness_factor if tile.enabled else 0.04
fill = blended.fill_color.scaled(brightness)
glow = blended.glow_color.scaled(brightness)
rim = blended.rim_color.scaled(brightness)
metadata = self._scale_metadata(blended.metadata, brightness)
frame_tiles[tile.tile_id] = TileFrame(
tile_id=tile.tile_id,
row=tile.row,
col=tile.col,
fill_color=fill,
glow_color=glow,
rim_color=rim,
label_color=label_contrast(fill),
intensity=blended.intensity,
enabled=tile.enabled,
led_pixels=self._build_led_pixels(
tile,
fill,
rim,
timestamp,
max(0.05, tempo_bpm / 60.0),
config.defaults.tile_behavior,
metadata,
),
metadata=metadata,
)
if params.color_mode == "palette":
background_start = darken(sample_palette(params.palette, 0.78), 0.86)
background_end = darken(sample_palette(params.palette, 0.22), 0.94)
else:
primary, secondary = choose_pair(
params.color_mode,
params.primary_color,
params.secondary_color,
params.palette,
0.22,
)
background_start = darken(secondary.mix(primary, 0.18), 0.78)
background_end = darken(primary.mix(secondary, 0.12), 0.92)
return PreviewFrame(
timestamp=timestamp,
pattern_id=pattern_id,
utility_mode=utility_mode,
background_start=background_start,
background_end=background_end,
tiles=frame_tiles,
)
def _blend_sample(self, tile_id: str, sample: TilePatternSample, blend_alpha: float) -> TilePatternSample:
previous = self._previous_samples.get(tile_id, sample)
return TilePatternSample(
fill_color=previous.fill_color.mix(sample.fill_color, blend_alpha),
glow_color=previous.glow_color.mix(sample.glow_color, blend_alpha),
rim_color=previous.rim_color.mix(sample.rim_color, blend_alpha),
label_color=previous.label_color.mix(sample.label_color, blend_alpha),
intensity=previous.intensity + (sample.intensity - previous.intensity) * blend_alpha,
metadata=sample.metadata,
)
def _fallback_sample(self, tile: TileConfig) -> TilePatternSample:
color = RGBColor(0.08, 0.12, 0.16)
return TilePatternSample(
fill_color=color,
glow_color=color.mix(RGBColor.white(), 0.12),
rim_color=color.mix(RGBColor.white(), 0.22),
label_color=RGBColor.white(),
intensity=0.2,
)
def _scale_metadata(self, metadata: dict[str, object], brightness: float) -> dict[str, object]:
if not metadata:
return {}
scaled = dict(metadata)
diagonal_split = metadata.get("diagonal_split")
if isinstance(diagonal_split, dict):
color_a = diagonal_split.get("color_a")
color_b = diagonal_split.get("color_b")
scaled_a = color_a.scaled(brightness) if isinstance(color_a, RGBColor) else RGBColor.black()
scaled_b = color_b.scaled(brightness) if isinstance(color_b, RGBColor) else RGBColor.black()
scaled["diagonal_split"] = {
**diagonal_split,
"color_a": scaled_a,
"color_b": scaled_b,
}
led_pixels = metadata.get("led_pixels")
if isinstance(led_pixels, dict):
scaled_led_pixels: dict[str, list[RGBColor]] = {}
for segment_name, segment_colors in led_pixels.items():
if not isinstance(segment_name, str) or not isinstance(segment_colors, list):
continue
scaled_segment: list[RGBColor] = []
for color in segment_colors:
if not isinstance(color, RGBColor):
continue
scaled_segment.append(color.scaled(brightness))
scaled_led_pixels[segment_name] = scaled_segment
scaled["led_pixels"] = scaled_led_pixels
return scaled
def _build_led_pixels(
self,
tile: TileConfig,
fill_color: RGBColor,
rim_color: RGBColor,
timestamp: float,
tempo_hz: float,
tile_behavior: str,
metadata: dict[str, object],
) -> dict[str, list[RGBColor]]:
led_pixels = metadata.get("led_pixels")
if isinstance(led_pixels, dict):
return self._build_metadata_led_pixels(tile, cast(dict[str, list[RGBColor]], led_pixels))
diagonal_split = metadata.get("diagonal_split")
if isinstance(diagonal_split, dict):
return self._build_diagonal_split_pixels(tile, diagonal_split)
pixels: dict[str, list[RGBColor]] = {}
if tile_behavior == "solid_color_per_tile":
for segment in tile.segments:
pixels[segment.name] = [fill_color for _ in range(segment.led_count)]
return pixels
for segment in tile.segments:
segment_pixels: list[RGBColor] = []
for index in range(segment.led_count):
pulse = 0.04 * math.sin((timestamp * tempo_hz * 3.0) + index * 0.15 + tile.row * 0.9 + tile.col * 0.55)
amount = 0.05 + max(0.0, pulse)
segment_pixels.append(fill_color.mix(rim_color, amount))
if segment.reverse:
segment_pixels.reverse()
pixels[segment.name] = segment_pixels
return pixels
def _build_metadata_led_pixels(self, tile: TileConfig, led_pixels: dict[str, list[RGBColor]]) -> dict[str, list[RGBColor]]:
pixels: dict[str, list[RGBColor]] = {}
for segment in tile.segments:
colors = list(led_pixels.get(segment.name, []))
if len(colors) < segment.led_count:
colors.extend([RGBColor.black()] * (segment.led_count - len(colors)))
pixels[segment.name] = colors[: segment.led_count]
return pixels
def _build_diagonal_split_pixels(self, tile: TileConfig, diagonal_split: dict[str, object]) -> dict[str, list[RGBColor]]:
orientation = str(diagonal_split.get("orientation", "slash"))
color_a = diagonal_split.get("color_a")
color_b = diagonal_split.get("color_b")
if not isinstance(color_a, RGBColor) or not isinstance(color_b, RGBColor):
return {}
pixels: dict[str, list[RGBColor]] = {}
for segment in tile.segments:
segment_pixels: list[RGBColor] = []
for x_pos, y_pos in segment_led_positions(tile, segment):
if orientation == "backslash":
color = color_a if y_pos <= x_pos else color_b
else:
color = color_a if y_pos <= 1.0 - x_pos else color_b
segment_pixels.append(color)
pixels[segment.name] = segment_pixels
return pixels
def _render_utility_frame(
self,
config: InfinityMirrorConfig,
params: PatternParameters,
utility_mode: str,
selected_tile_id: str | None,
timestamp: float,
) -> dict[str, TilePatternSample]:
tiles = config.sorted_tiles()
blank = TilePatternSample(
fill_color=RGBColor.black(),
glow_color=RGBColor(0.04, 0.06, 0.07),
rim_color=RGBColor(0.08, 0.1, 0.12),
label_color=RGBColor.white(),
intensity=0.1,
)
if utility_mode == "blackout":
return {tile.tile_id: blank for tile in tiles}
if utility_mode == "identify":
active_index = int(timestamp * 2.0) % max(1, len(tiles))
result = {tile.tile_id: blank for tile in tiles}
active = tiles[active_index]
result[active.tile_id] = TilePatternSample(
fill_color=RGBColor.white().scaled(0.9),
glow_color=RGBColor(1.0, 0.85, 0.4),
rim_color=RGBColor.white(),
label_color=RGBColor.black(),
intensity=1.0,
metadata={"active": True},
)
return result
if utility_mode == "single_tile":
result = {tile.tile_id: blank for tile in tiles}
if selected_tile_id and selected_tile_id in result:
result[selected_tile_id] = TilePatternSample(
fill_color=RGBColor.white(),
glow_color=RGBColor(0.9, 0.96, 1.0),
rim_color=RGBColor.white(),
label_color=RGBColor.black(),
intensity=1.0,
metadata={"active": True},
)
return result
if utility_mode == "row_test":
palette = [sample_palette(params.palette, row / max(1, config.logical_display.rows - 1)) for row in range(config.logical_display.rows)]
return {
tile.tile_id: TilePatternSample(
fill_color=palette[tile.row - 1].scaled(0.95),
glow_color=palette[tile.row - 1].mix(RGBColor.white(), 0.2),
rim_color=palette[tile.row - 1].mix(RGBColor.white(), 0.3),
label_color=RGBColor.black() if tile.row == 1 else RGBColor.white(),
intensity=0.9,
)
for tile in tiles
}
if utility_mode == "column_test":
palette = [sample_palette(params.palette, col / max(1, config.logical_display.cols - 1)) for col in range(config.logical_display.cols)]
return {
tile.tile_id: TilePatternSample(
fill_color=palette[tile.col - 1].scaled(0.95),
glow_color=palette[tile.col - 1].mix(RGBColor.white(), 0.2),
rim_color=palette[tile.col - 1].mix(RGBColor.white(), 0.3),
label_color=RGBColor.white(),
intensity=0.9,
)
for tile in tiles
}
if utility_mode == "checker_test":
return {
tile.tile_id: TilePatternSample(
fill_color=(RGBColor.white() if (tile.row + tile.col) % 2 == 0 else RGBColor(0.05, 0.08, 0.1)),
glow_color=RGBColor(0.8, 0.9, 1.0) if (tile.row + tile.col) % 2 == 0 else RGBColor(0.08, 0.1, 0.14),
rim_color=RGBColor.white() if (tile.row + tile.col) % 2 == 0 else RGBColor(0.12, 0.16, 0.18),
label_color=RGBColor.black() if (tile.row + tile.col) % 2 == 0 else RGBColor.white(),
intensity=1.0 if (tile.row + tile.col) % 2 == 0 else 0.15,
)
for tile in tiles
}
return {tile.tile_id: blank for tile in tiles}

90
app/core/presets.py Normal file
View File

@@ -0,0 +1,90 @@
from __future__ import annotations
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
import json
import re
from .types import PatternParameters
@dataclass
class PresetRecord:
name: str
pattern_id: str
parameters: dict
brightness: float
palette: str
created_at: str
tempo_bpm: float | None = None
@classmethod
def create(cls, name: str, pattern_id: str, params: PatternParameters, tempo_bpm: float | None = None) -> "PresetRecord":
return cls(
name=name,
pattern_id=pattern_id,
parameters=params.to_dict(),
brightness=params.brightness,
palette=params.palette,
created_at=datetime.utcnow().isoformat(timespec="seconds"),
tempo_bpm=tempo_bpm,
)
class PresetStore:
def __init__(self, root: str | Path) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
def list_presets(self) -> list[PresetRecord]:
presets: list[PresetRecord] = []
for path in sorted(self.root.glob("*.json")):
try:
payload = json.loads(path.read_text(encoding="utf-8"))
presets.append(PresetRecord(**payload))
except (OSError, json.JSONDecodeError, TypeError):
continue
return presets
def save(self, record: PresetRecord) -> Path:
path = self.root / f"{slugify(record.name)}.json"
path.write_text(json.dumps(asdict(record), indent=2), encoding="utf-8")
return path
def load(self, name: str) -> PresetRecord:
path = self.root / f"{slugify(name)}.json"
payload = json.loads(path.read_text(encoding="utf-8"))
return PresetRecord(**payload)
def delete(self, name: str) -> None:
path = self.root / f"{slugify(name)}.json"
if path.exists():
path.unlink()
def ensure_seed_presets(self) -> None:
if any(self.root.glob("*.json")):
return
seeds = [
PresetRecord.create(
"Afterhours Pulse",
"center_pulse",
PatternParameters(palette="Afterhours", color_mode="palette", fade=0.28),
),
PresetRecord.create(
"Laser Chase",
"scan_dual",
PatternParameters(palette="Laser Club", block_size=1.6, direction="left_to_right"),
),
PresetRecord.create(
"Heat Breathing",
"breathing",
PatternParameters(palette="Warehouse Heat", color_mode="palette", fade=0.7),
),
]
for preset in seeds:
self.save(preset)
def slugify(value: str) -> str:
return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") or "preset"

342
app/core/types.py Normal file
View File

@@ -0,0 +1,342 @@
from __future__ import annotations
from dataclasses import dataclass, field, replace
from typing import Any
def clamp(value: float, minimum: float = 0.0, maximum: float = 1.0) -> float:
return max(minimum, min(maximum, value))
_SCAN_ANGLES = (0, 45, 90, 135, 180, 225, 270, 315)
def _nearest_scan_angle(value: float) -> int:
angle = int(round(float(value))) % 360
return min(_SCAN_ANGLES, key=lambda candidate: min((candidate - angle) % 360, (angle - candidate) % 360))
def _coerce_bool(value: object, default: bool = False) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
return default
@dataclass(frozen=True)
class RGBColor:
r: float
g: float
b: float
def clamped(self) -> "RGBColor":
return RGBColor(clamp(self.r), clamp(self.g), clamp(self.b))
def scaled(self, factor: float) -> "RGBColor":
return RGBColor(self.r * factor, self.g * factor, self.b * factor).clamped()
def mix(self, other: "RGBColor", amount: float) -> "RGBColor":
amount = clamp(amount)
return RGBColor(
self.r + (other.r - self.r) * amount,
self.g + (other.g - self.g) * amount,
self.b + (other.b - self.b) * amount,
).clamped()
def to_8bit_tuple(self) -> tuple[int, int, int]:
value = self.clamped()
return int(value.r * 255), int(value.g * 255), int(value.b * 255)
def to_hex(self) -> str:
red, green, blue = self.to_8bit_tuple()
return f"#{red:02X}{green:02X}{blue:02X}"
@staticmethod
def black() -> "RGBColor":
return RGBColor(0.0, 0.0, 0.0)
@staticmethod
def white() -> "RGBColor":
return RGBColor(1.0, 1.0, 1.0)
@dataclass
class PatternParameters:
speed: float = 0.45
brightness: float = 1.0
fade: float = 0.35
tempo_multiplier: float = 1.0
direction: str = "left_to_right"
checker_mode: str = "classic"
scan_style: str = "line"
angle: float = 0.0
on_width: float = 1.0
off_width: float = 1.0
band_thickness: float = 0.8
flip_horizontal: bool = False
flip_vertical: bool = False
strobe_mode: str = "global"
stopwatch_mode: str = "sync"
color_mode: str = "dual"
primary_color: str = "#4D7CFF"
secondary_color: str = "#0E1630"
palette: str = "Laser Club"
symmetry: str = "none"
center_pulse_mode: str = "expand"
step_size: float = 1.0
block_size: float = 1.0
pixel_group_size: float = 1.0
strobe_duty_cycle: float = 0.5
randomness: float = 0.35
def clone(self) -> "PatternParameters":
return replace(self)
def sanitized(self) -> "PatternParameters":
return PatternParameters(
speed=max(0.01, self.speed),
brightness=clamp(self.brightness, 0.0, 2.0),
fade=clamp(self.fade),
tempo_multiplier=clamp(self.tempo_multiplier, 0.25, 8.0),
direction=self.direction,
checker_mode=self.checker_mode,
scan_style=self.scan_style if self.scan_style in {"line", "bands"} else "line",
angle=_nearest_scan_angle(self.angle),
on_width=clamp(self.on_width, 0.1, 2.0),
off_width=clamp(self.off_width, 0.0, 2.0),
band_thickness=clamp(self.band_thickness, 0.1, 2.0),
flip_horizontal=bool(self.flip_horizontal),
flip_vertical=bool(self.flip_vertical),
strobe_mode=self.strobe_mode,
stopwatch_mode=self.stopwatch_mode,
color_mode=self.color_mode,
primary_color=self.primary_color,
secondary_color=self.secondary_color,
palette=self.palette,
symmetry=self.symmetry,
center_pulse_mode=self.center_pulse_mode if self.center_pulse_mode in {"expand", "reverse", "outline", "outline_reverse"} else "expand",
step_size=max(0.1, self.step_size),
block_size=max(0.1, self.block_size),
pixel_group_size=max(1.0, min(5.0, round(self.pixel_group_size))),
strobe_duty_cycle=clamp(self.strobe_duty_cycle, 0.02, 0.98),
randomness=clamp(self.randomness, 0.0, 1.5),
)
def to_dict(self) -> dict[str, Any]:
return {
"brightness": self.brightness,
"fade": self.fade,
"tempo_multiplier": self.tempo_multiplier,
"direction": self.direction,
"checker_mode": self.checker_mode,
"scan_style": self.scan_style,
"angle": self.angle,
"on_width": self.on_width,
"off_width": self.off_width,
"strobe_mode": self.strobe_mode,
"stopwatch_mode": self.stopwatch_mode,
"color_mode": self.color_mode,
"primary_color": self.primary_color,
"secondary_color": self.secondary_color,
"palette": self.palette,
"symmetry": self.symmetry,
"center_pulse_mode": self.center_pulse_mode,
"block_size": self.block_size,
"pixel_group_size": self.pixel_group_size,
"strobe_duty_cycle": self.strobe_duty_cycle,
"randomness": self.randomness,
}
@classmethod
def from_dict(cls, payload: dict[str, Any]) -> "PatternParameters":
return cls(
speed=float(payload.get("speed", cls.speed)),
brightness=float(payload.get("brightness", cls.brightness)),
fade=float(payload.get("fade", cls.fade)),
tempo_multiplier=float(payload.get("tempo_multiplier", cls.tempo_multiplier)),
direction=str(payload.get("direction", cls.direction)),
checker_mode=str(payload.get("checker_mode", cls.checker_mode)),
scan_style=str(payload.get("scan_style", payload.get("diagonal_scan_mode", cls.scan_style))),
angle=float(payload.get("angle", cls.angle)),
on_width=float(payload.get("on_width", cls.on_width)),
off_width=float(payload.get("off_width", cls.off_width)),
band_thickness=float(payload.get("band_thickness", payload.get("on_width", cls.band_thickness))),
flip_horizontal=_coerce_bool(payload.get("flip_horizontal", cls.flip_horizontal)),
flip_vertical=_coerce_bool(payload.get("flip_vertical", cls.flip_vertical)),
strobe_mode=str(payload.get("strobe_mode", cls.strobe_mode)),
stopwatch_mode=str(payload.get("stopwatch_mode", cls.stopwatch_mode)),
color_mode=str(payload.get("color_mode", cls.color_mode)),
primary_color=str(payload.get("primary_color", cls.primary_color)),
secondary_color=str(payload.get("secondary_color", cls.secondary_color)),
palette=str(payload.get("palette", cls.palette)),
symmetry=str(payload.get("symmetry", cls.symmetry)),
center_pulse_mode=str(payload.get("center_pulse_mode", cls.center_pulse_mode)),
step_size=float(payload.get("step_size", cls.step_size)),
block_size=float(payload.get("block_size", cls.block_size)),
pixel_group_size=float(payload.get("pixel_group_size", cls.pixel_group_size)),
strobe_duty_cycle=float(payload.get("strobe_duty_cycle", cls.strobe_duty_cycle)),
randomness=float(payload.get("randomness", cls.randomness)),
).sanitized()
@dataclass
class SceneState:
pattern_id: str = "solid"
params: PatternParameters = field(default_factory=PatternParameters)
def clone(self) -> "SceneState":
return SceneState(
pattern_id=self.pattern_id,
params=self.params.clone(),
)
@dataclass
class TilePatternSample:
fill_color: RGBColor
glow_color: RGBColor
rim_color: RGBColor
label_color: RGBColor
intensity: float = 1.0
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class TileFrame:
tile_id: str
row: int
col: int
fill_color: RGBColor
glow_color: RGBColor
rim_color: RGBColor
label_color: RGBColor
intensity: float
enabled: bool
led_pixels: dict[str, list[RGBColor]] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class PreviewFrame:
timestamp: float
pattern_id: str
utility_mode: str
background_start: RGBColor
background_end: RGBColor
tiles: dict[str, TileFrame]
@dataclass
class SceneTransition:
started_at: float
duration_s: float
source_frame: PreviewFrame
target_scene: SceneState
def _blend_led_pixel_groups(
source_pixels: dict[str, list[RGBColor]],
target_pixels: dict[str, list[RGBColor]],
amount: float,
) -> dict[str, list[RGBColor]]:
if not source_pixels and not target_pixels:
return {}
blended: dict[str, list[RGBColor]] = {}
black = RGBColor.black()
for segment_name in sorted(set(source_pixels) | set(target_pixels)):
source_segment = source_pixels.get(segment_name, [])
target_segment = target_pixels.get(segment_name, [])
count = max(len(source_segment), len(target_segment))
segment_colors: list[RGBColor] = []
for index in range(count):
source_color = source_segment[index] if index < len(source_segment) else black
target_color = target_segment[index] if index < len(target_segment) else black
segment_colors.append(source_color.mix(target_color, amount))
blended[segment_name] = segment_colors
return blended
def _blend_metadata(source: dict[str, Any], target: dict[str, Any], amount: float) -> dict[str, Any]:
if not source and not target:
return {}
blended = {
key: value
for key, value in source.items()
if key not in {"diagonal_split", "led_pixels"}
}
for key, value in target.items():
if key not in {"diagonal_split", "led_pixels"}:
blended[key] = value
source_split = source.get("diagonal_split")
target_split = target.get("diagonal_split")
if isinstance(source_split, dict) or isinstance(target_split, dict):
source_split = source_split if isinstance(source_split, dict) else {}
target_split = target_split if isinstance(target_split, dict) else {}
source_a = source_split.get("color_a", RGBColor.black())
source_b = source_split.get("color_b", RGBColor.black())
target_a = target_split.get("color_a", RGBColor.black())
target_b = target_split.get("color_b", RGBColor.black())
if isinstance(source_a, RGBColor) and isinstance(source_b, RGBColor) and isinstance(target_a, RGBColor) and isinstance(target_b, RGBColor):
blended["diagonal_split"] = {
"orientation": str(target_split.get("orientation", source_split.get("orientation", "slash"))),
"color_a": source_a.mix(target_a, amount),
"color_b": source_b.mix(target_b, amount),
}
source_pixels = source.get("led_pixels")
target_pixels = target.get("led_pixels")
if isinstance(source_pixels, dict) or isinstance(target_pixels, dict):
source_pixels = source_pixels if isinstance(source_pixels, dict) else {}
target_pixels = target_pixels if isinstance(target_pixels, dict) else {}
blended["led_pixels"] = _blend_led_pixel_groups(source_pixels, target_pixels, amount)
return blended
def blend_preview_frames(source: PreviewFrame, target: PreviewFrame, amount: float) -> PreviewFrame:
amount = clamp(amount)
blended_tiles: dict[str, TileFrame] = {}
for tile_id in sorted(set(source.tiles) | set(target.tiles)):
source_tile = source.tiles.get(tile_id)
target_tile = target.tiles.get(tile_id)
if source_tile is None and target_tile is not None:
blended_tiles[tile_id] = target_tile
continue
if target_tile is None and source_tile is not None:
blended_tiles[tile_id] = source_tile
continue
if source_tile is None or target_tile is None:
continue
blended_tiles[tile_id] = TileFrame(
tile_id=target_tile.tile_id,
row=target_tile.row,
col=target_tile.col,
fill_color=source_tile.fill_color.mix(target_tile.fill_color, amount),
glow_color=source_tile.glow_color.mix(target_tile.glow_color, amount),
rim_color=source_tile.rim_color.mix(target_tile.rim_color, amount),
label_color=source_tile.label_color.mix(target_tile.label_color, amount),
intensity=source_tile.intensity + (target_tile.intensity - source_tile.intensity) * amount,
enabled=source_tile.enabled and target_tile.enabled,
led_pixels=_blend_led_pixel_groups(source_tile.led_pixels, target_tile.led_pixels, amount),
metadata=_blend_metadata(source_tile.metadata, target_tile.metadata, amount),
)
return PreviewFrame(
timestamp=target.timestamp,
pattern_id=target.pattern_id if amount >= 0.5 else source.pattern_id,
utility_mode=target.utility_mode if amount >= 0.5 else source.utility_mode,
background_start=source.background_start.mix(target.background_start, amount),
background_end=source.background_end.mix(target.background_end, amount),
tiles=blended_tiles,
)