Compare commits

1 Commits

Author SHA1 Message Date
3eb702a762 Add Infinity_Vis_1 performance core 2026-04-17 01:08:43 +02:00
20 changed files with 1850 additions and 0 deletions

View File

@@ -0,0 +1,44 @@
# Infinity_Vis_1 Architecture
## Design goal
Make the render path cheaper than the current application by replacing
object-heavy frame construction with a compact, controller-oriented runtime.
## High-level layers
1. `mapping_xml.py`
Parses and writes the existing XML mapping format.
2. `models.py`
Small `slots=True` dataclasses for tiles, segments, controllers, scenes, and frames.
3. `patterns.py`
Stateless pattern functions that return tile colors only.
4. `renderer.py`
Owns precomputed controller routes and reusable `bytearray` payloads.
5. `engine.py`
Separates output cadence from preview cadence and handles delta sending.
6. `output/ddp.py`
Sends already-built payloads as DDP packets.
7. `presets.py`
JSON preset persistence.
8. `benchmark.py`
Simple measurable entry point for render throughput.
## Main performance choices
- Render tile colors first, not rich nested preview objects.
- Reuse controller payload buffers every frame.
- Precompute segment byte offsets once at startup.
- Keep the runtime pure-stdlib for the new core.
- Keep preview as a consumer of a compact frame snapshot, not as the center of the engine.
- Decouple preview timing from hardware output timing so UI cost cannot directly cap live output.
- Skip unchanged controller payloads while still sending periodic keepalives.
## What is intentionally not in phase 1
- Full Qt UI parity
- Full preview painter parity
- WLED discovery UI port
- Advanced transition compositing parity
Those belong in phase 2, after the faster runtime is proven and benchmarked.

View File

@@ -0,0 +1,124 @@
# Current Feature Inventory
This inventory was collected from the current `app/` codebase and is the scope
target for the clean rebuild.
## Core control flow
- Mapping/project open, save, and save-as
- Startup auto-load of sample mapping
- Live scene and Next scene
- FOH mode with `Go` and `Fade Go`
- Tempo transport and transition time control
- Utility modes such as blackout and single-tile white test
## Pattern system
Current built-in patterns:
- `solid`
- `checker`
- `row_gradient`
- `column_gradient`
- `center_pulse`
- `sparkle`
- `breathing`
- `wave_line`
- `scan`
- `arrow`
- `scan_dual`
- `sweep`
- `saw`
- `two_dots`
- `strobe`
- `stopwatch`
- `snake`
Pattern parameter surface in the current app includes:
- brightness
- fade / smoothing
- tempo multiplier
- direction
- checker mode
- scan style and angle
- on/off width
- band thickness
- flip horizontal / vertical
- strobe mode and duty cycle
- stopwatch mode
- color mode
- primary / secondary colors
- palette
- symmetry
- center pulse mode
- step size
- block size
- pixel group size
- randomness
## Preview and UI
- Main desktop Qt window
- Pattern panel
- Preset browser
- Tile preview / technical preview / LEDs-only preview
- Fullscreen preview window
- Selected tile panel
- Utility controls
- Output diagnostics panel
## Mapping and editing
- XML mapping load/save/validation
- Tile table editing
- Segment table editing
- Raw XML editor
- Calibration and enable flags per tile
- Controller IP / host / name / MAC metadata persistence
## WLED-assisted mapping
- Network scan for WLED devices
- Device discovery via `/json/info` and `/json`
- Per-device identify action
- Tile assignment workflow inside Mapping Settings
- Immediate persistence back to the mapping file
## Presets
- JSON-backed preset storage
- Seed presets on first run
- Save, load, and delete presets
## Output
- Preview backend
- DDP backend for WLED
- Art-Net backend seam
- Output enable/disable
- Output FPS target
- Render FPS / Send FPS / Output health diagnostics
- Controller-side FPS polling for WLED live mode
## Performance pain points in the current code
- A lot of per-frame object churn:
- `PreviewFrame`
- `TileFrame`
- `TilePatternSample`
- many `RGBColor` instances
- nested `dict[str, list[RGBColor]]` payloads
- Per-frame rebuilding of LED pixel lists for each segment
- Preview and output coordination still centered around one controller loop
- Multiple render engines and transition helpers duplicating state
- DDP payload generation starts from nested Python objects instead of a compact frame buffer
## Rebuild priorities for Infinity_Vis_1
- fixed-size runtime state
- lower per-frame allocation
- preview cadence separated from hardware output cadence
- controller-first payload generation
- shared-IP multi-tile controller topology as a first-class concept
- clean feature layering so UI, preview, render, and output can scale independently

View File

@@ -0,0 +1,67 @@
# Infinity_Vis_1 Migration Status
This file tracks the legacy feature surface against the clean rebuild so we can
rebuild deliberately instead of drifting into another tangled codebase.
## Core runtime
| Feature | Legacy app | Infinity_Vis_1 |
| --- | --- | --- |
| XML mapping load/save | Yes | Yes |
| Mapping validation | Yes | Yes |
| Shared-IP controller grouping | Yes | Yes |
| Tile-color pattern render path | Yes | Yes |
| Direct DDP payload generation | Yes | Yes |
| Output/preview timing split | Partial | Yes |
| Delta sending with keepalive | Partial | Yes |
| Benchmark tooling | Partial | Yes |
## Patterns
| Pattern | Legacy app | Infinity_Vis_1 |
| --- | --- | --- |
| solid | Yes | Yes |
| checker | Yes | Yes |
| row_gradient | Yes | Yes |
| column_gradient | Yes | Yes |
| center_pulse | Yes | Yes |
| sparkle | Yes | Yes |
| breathing | Yes | Yes |
| wave_line | Yes | Yes |
| scan | Yes | Yes |
| arrow | Yes | Yes |
| scan_dual | Yes | Yes |
| sweep | Yes | Yes |
| saw | Yes | Yes |
| two_dots | Yes | Yes |
| strobe | Yes | Yes |
| stopwatch | Yes | Yes |
| snake | Yes | Yes |
## Presets and project data
| Feature | Legacy app | Infinity_Vis_1 |
| --- | --- | --- |
| JSON preset storage | Yes | Yes |
| Project scene state | Yes | Yes |
| Controller metadata persistence | Yes | Yes |
| WLED mapping metadata in XML | Yes | Yes |
## UI and operator workflow
| Feature | Legacy app | Infinity_Vis_1 |
| --- | --- | --- |
| Desktop Qt main window | Yes | Not yet |
| Full preview painter | Yes | Not yet |
| FOH Go/Fade Go workflow | Yes | Not yet |
| Preset browser UI | Yes | Not yet |
| Selected tile tools | Yes | Not yet |
| Mapping Settings dialog | Yes | Not yet |
| WLED assisted mapping UI | Yes | Not yet |
| Output diagnostics panel | Yes | Not yet |
## Phase order
1. Prove the faster core with tests and benchmarks.
2. Rebuild scene transport and operator workflows on top of the new engine.
3. Rebuild the desktop UI as a thin shell over the new runtime instead of the old state-heavy controller loop.

72
Infinity_Vis_1/README.md Normal file
View File

@@ -0,0 +1,72 @@
# Infinity_Vis_1
Fresh restart of Infinity Vis with a smaller runtime surface and a lower-allocation render path.
This directory is intentionally separate from the legacy `app/` implementation.
The goal is to keep the old tool usable while rebuilding the next version on a
clean architecture.
## What is in here
- `FEATURE_INVENTORY.md`
Current feature surface collected from the existing application.
- `ARCHITECTURE.md`
The new runtime design and the main performance goals.
- `MIGRATION_STATUS.md`
Honest parity tracking between the legacy app and the clean rebuild.
- `infinity_vis_1/`
New core package with mapping I/O, patterns, fast renderer, presets, and DDP output.
- `tests/`
Basic verification for the new mapping and renderer core.
## Current focus
Phase 1 is the performance-oriented core, not the full Qt feature-parity UI yet.
That means this new codebase already covers:
- loading and saving the existing XML mapping format
- controller grouping for shared-IP multi-tile nodes
- pattern rendering with a low-allocation tile-color pipeline
- separate output and preview timing via a new runtime engine
- direct DDP payload generation
- delta sending with keepalive support for unchanged controller frames
- preset persistence
- benchmark tooling
## Run the benchmark
From the repo root:
```powershell
python -m Infinity_Vis_1.infinity_vis_1.benchmark --frames 600
python -m Infinity_Vis_1.infinity_vis_1.benchmark --mode engine --frames 600 --pattern solid
```
## Start the live DDP sender
```powershell
python -m Infinity_Vis_1.infinity_vis_1.live --mapping sample_data\infinity_mirror_mapping_clean.xml --pattern solid --fps 40
```
Example for a harder live test:
```powershell
python -m Infinity_Vis_1.infinity_vis_1.live --mapping sample_data\infinity_mirror_mapping_clean.xml --pattern strobe --fps 40 --strobe-mode global --no-delta
```
## Use the runtime core
```python
from Infinity_Vis_1.infinity_vis_1 import OutputSettings, RealtimeEngine, SceneState, load_mapping
mapping = load_mapping("sample_data/infinity_mirror_mapping_clean.xml")
engine = RealtimeEngine(mapping, scene=SceneState(), settings=OutputSettings(output_fps=40, preview_fps=12))
result = engine.tick()
```
## Run the new tests
```powershell
python -m unittest discover Infinity_Vis_1/tests
```

View File

@@ -0,0 +1,17 @@
"""Performance-oriented Infinity Vis restart."""
from .engine import OutputSettings, RealtimeEngine
from .mapping_xml import load_mapping, save_mapping
from .models import MappingSpec, PatternParams, SceneState
from .renderer import FastRenderer
__all__ = [
"FastRenderer",
"MappingSpec",
"OutputSettings",
"PatternParams",
"RealtimeEngine",
"SceneState",
"load_mapping",
"save_mapping",
]

View File

@@ -0,0 +1,82 @@
from __future__ import annotations
import argparse
from pathlib import Path
import time
from .engine import OutputSettings, RealtimeEngine
from .mapping_xml import load_mapping
from .models import PatternParams, SceneState
from .patterns import pattern_ids
from .renderer import FastRenderer
def main() -> None:
parser = argparse.ArgumentParser(description="Benchmark the Infinity_Vis_1 render core.")
parser.add_argument(
"--mapping",
type=Path,
default=Path(__file__).resolve().parents[2] / "sample_data" / "infinity_mirror_mapping_clean.xml",
help="Path to the XML mapping file.",
)
parser.add_argument("--frames", type=int, default=600, help="Frames per pattern.")
parser.add_argument("--pattern", type=str, default="", help="Benchmark a single pattern id.")
parser.add_argument(
"--mode",
type=str,
default="render",
choices=["render", "engine"],
help="Benchmark only the renderer or the new runtime engine loop.",
)
args = parser.parse_args()
mapping = load_mapping(args.mapping)
renderer = FastRenderer(mapping)
stats = renderer.stats()
patterns = [args.pattern] if args.pattern else pattern_ids()
print(f"Mapping: {mapping.name}")
print(f"Tiles: {stats.tile_count} | Controllers: {stats.controller_count} | LEDs: {stats.total_leds}")
for pattern_id in patterns:
scene = SceneState(pattern_id=pattern_id, params=PatternParams(), tempo_bpm=120.0)
if args.mode == "engine":
fps, preview_frames, changed_frames = _benchmark_engine(mapping, scene, max(1, args.frames))
print(
f"{pattern_id:16s} {fps:9.1f} engine fps "
f"| preview {preview_frames:4d} | changed controllers {changed_frames:4d}"
)
else:
fps = _benchmark_renderer(renderer, scene, max(1, args.frames))
print(f"{pattern_id:16s} {fps:9.1f} render fps")
def _benchmark_renderer(renderer: FastRenderer, scene: SceneState, frames: int) -> float:
start = time.perf_counter()
for frame_index in range(frames):
renderer.render(scene, timestamp=start + frame_index / 60.0)
elapsed = time.perf_counter() - start
return frames / elapsed if elapsed > 0 else 0.0
def _benchmark_engine(mapping, scene: SceneState, frames: int) -> tuple[float, int, int]:
engine = RealtimeEngine(
mapping,
scene=scene,
settings=OutputSettings(output_fps=60.0, preview_fps=12.0, delta_sending=True, keepalive_seconds=0.35),
)
start = time.perf_counter()
preview_frames = 0
changed_frames = 0
for frame_index in range(frames):
result = engine.tick(now=start + frame_index / 60.0)
if result.preview_snapshot is not None:
preview_frames += 1
changed_frames += len(result.changed_payloads)
elapsed = time.perf_counter() - start
fps = frames / elapsed if elapsed > 0 else 0.0
return fps, preview_frames, changed_frames
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,67 @@
from __future__ import annotations
from typing import Iterable
from .models import RGB
def clamp_byte(value: float) -> int:
return max(0, min(255, int(round(value))))
def rgb_from_hex(value: str) -> RGB:
raw = str(value or "").strip().lstrip("#")
if len(raw) != 6:
return (255, 255, 255)
try:
return (int(raw[0:2], 16), int(raw[2:4], 16), int(raw[4:6], 16))
except ValueError:
return (255, 255, 255)
def mix(a: RGB, b: RGB, amount: float) -> RGB:
amount = max(0.0, min(1.0, float(amount)))
return (
clamp_byte(a[0] + (b[0] - a[0]) * amount),
clamp_byte(a[1] + (b[1] - a[1]) * amount),
clamp_byte(a[2] + (b[2] - a[2]) * amount),
)
def scale(color: RGB, factor: float) -> RGB:
return (
clamp_byte(color[0] * factor),
clamp_byte(color[1] * factor),
clamp_byte(color[2] * factor),
)
def average(colors: Iterable[RGB]) -> RGB:
items = list(colors)
if not items:
return (0, 0, 0)
return (
clamp_byte(sum(color[0] for color in items) / len(items)),
clamp_byte(sum(color[1] for color in items) / len(items)),
clamp_byte(sum(color[2] for color in items) / len(items)),
)
PALETTES: dict[str, tuple[RGB, ...]] = {
"Laser Club": ((77, 124, 255), (14, 22, 48), (255, 42, 109), (0, 255, 192)),
"Afterhours": ((70, 30, 110), (15, 10, 35), (255, 90, 140), (255, 200, 120)),
"Warehouse Heat": ((255, 120, 30), (55, 5, 0), (255, 210, 80), (180, 30, 10)),
"Monochrome": ((255, 255, 255), (30, 30, 30)),
}
def palette_color(name: str, phase: float) -> RGB:
palette = PALETTES.get(name, PALETTES["Laser Club"])
if len(palette) == 1:
return palette[0]
phase = float(phase) % 1.0
scaled_phase = phase * len(palette)
index = int(scaled_phase) % len(palette)
next_index = (index + 1) % len(palette)
local = scaled_phase - int(scaled_phase)
return mix(palette[index], palette[next_index], local)

View File

@@ -0,0 +1,196 @@
from __future__ import annotations
from dataclasses import dataclass, field
import time
from .models import FrameSnapshot, MappingSpec, SceneState
from .output.ddp import DDPClient
from .renderer import FastRenderer
@dataclass(slots=True)
class OutputSettings:
output_fps: float = 40.0
preview_fps: float = 12.0
keepalive_seconds: float = 0.35
delta_sending: bool = True
preview_enabled: bool = True
@dataclass(slots=True)
class RuntimeDiagnostics:
tile_count: int
controller_count: int
total_leds: int
output_frames: int = 0
preview_frames: int = 0
packets_built: int = 0
packets_sent: int = 0
changed_controller_frames: int = 0
keepalive_controller_frames: int = 0
stale_output_frames: int = 0
last_render_ms: float = 0.0
last_tick_at: float = 0.0
last_output_at: float = 0.0
last_preview_at: float = 0.0
@dataclass(slots=True)
class TickResult:
timestamp: float
output_due: bool
preview_due: bool
output_snapshot: FrameSnapshot | None = None
preview_snapshot: FrameSnapshot | None = None
changed_payloads: dict[str, bytes] = field(default_factory=dict)
packet_count: int = 0
class RealtimeEngine:
def __init__(
self,
mapping: MappingSpec,
scene: SceneState | None = None,
settings: OutputSettings | None = None,
ddp_client: DDPClient | None = None,
) -> None:
self.mapping = mapping
self.scene = scene or SceneState()
self.settings = settings or OutputSettings()
self.renderer = FastRenderer(mapping)
self.ddp_client = ddp_client or DDPClient()
renderer_stats = self.renderer.stats()
self.diagnostics = RuntimeDiagnostics(
tile_count=renderer_stats.tile_count,
controller_count=renderer_stats.controller_count,
total_leds=renderer_stats.total_leds,
)
self._last_output_at: float | None = None
self._last_preview_at: float | None = None
self._last_sent_payloads: dict[str, bytes] = {}
self._last_sent_at: dict[str, float] = {}
self._latest_snapshot: FrameSnapshot | None = None
self._force_output = True
self._force_preview = True
def set_scene(self, scene: SceneState) -> None:
self.scene = scene
self._force_output = True
self._force_preview = True
def request_preview_refresh(self) -> None:
self._force_preview = True
def latest_snapshot(self) -> FrameSnapshot | None:
return self._latest_snapshot
def tick(self, now: float | None = None) -> TickResult:
timestamp = time.perf_counter() if now is None else float(now)
output_due = self._is_due(self._last_output_at, self._interval(self.settings.output_fps), timestamp, self._force_output)
preview_due = self.settings.preview_enabled and self._is_due(
self._last_preview_at,
self._interval(self.settings.preview_fps),
timestamp,
self._force_preview,
)
if not output_due and not preview_due:
return TickResult(timestamp=timestamp, output_due=False, preview_due=False)
render_started = time.perf_counter()
snapshot = self.renderer.render(self.scene, timestamp=timestamp)
self.diagnostics.last_render_ms = (time.perf_counter() - render_started) * 1000.0
self.diagnostics.last_tick_at = timestamp
self._latest_snapshot = snapshot
changed_payloads: dict[str, bytes] = {}
packet_count = 0
output_snapshot: FrameSnapshot | None = None
preview_snapshot: FrameSnapshot | None = None
if output_due:
changed_payloads, changed_count, keepalive_count = self._select_output_payloads(
snapshot.controller_payloads,
timestamp,
)
output_snapshot = snapshot
self._last_output_at = timestamp
self._force_output = False
self.diagnostics.output_frames += 1
self.diagnostics.changed_controller_frames += changed_count
self.diagnostics.keepalive_controller_frames += keepalive_count
self.diagnostics.last_output_at = timestamp
if changed_payloads:
packet_count = self.ddp_client.count_packets(changed_payloads)
self.diagnostics.packets_built += packet_count
else:
self.diagnostics.stale_output_frames += 1
if preview_due:
preview_snapshot = snapshot
self._last_preview_at = timestamp
self._force_preview = False
self.diagnostics.preview_frames += 1
self.diagnostics.last_preview_at = timestamp
return TickResult(
timestamp=timestamp,
output_due=output_due,
preview_due=preview_due,
output_snapshot=output_snapshot,
preview_snapshot=preview_snapshot,
changed_payloads=changed_payloads,
packet_count=packet_count,
)
def send_due(self, now: float | None = None) -> TickResult:
result = self.tick(now=now)
if result.changed_payloads:
sent = self.ddp_client.send(result.changed_payloads)
self.diagnostics.packets_sent += sent
return result
def _interval(self, fps: float) -> float:
if fps <= 0:
return 0.0
return 1.0 / float(fps)
def _is_due(self, last_at: float | None, interval: float, now: float, forced: bool) -> bool:
if forced or last_at is None:
return True
if interval <= 0:
return True
return (now - last_at) >= max(0.0, interval - 1e-9)
def _select_output_payloads(self, payloads: dict[str, bytes], timestamp: float) -> tuple[dict[str, bytes], int, int]:
if not self.settings.delta_sending:
for host, payload in payloads.items():
self._last_sent_payloads[host] = payload
self._last_sent_at[host] = timestamp
return dict(payloads), len(payloads), 0
changed_payloads: dict[str, bytes] = {}
changed_count = 0
keepalive_count = 0
keepalive_seconds = max(0.0, float(self.settings.keepalive_seconds))
for host, payload in payloads.items():
previous = self._last_sent_payloads.get(host)
last_sent_at = self._last_sent_at.get(host)
unchanged = previous == payload
keepalive_due = (
keepalive_seconds > 0.0
and unchanged
and last_sent_at is not None
and (timestamp - last_sent_at) >= keepalive_seconds
)
if previous is None or not unchanged or keepalive_due:
changed_payloads[host] = payload
self._last_sent_payloads[host] = payload
self._last_sent_at[host] = timestamp
if keepalive_due and unchanged:
keepalive_count += 1
else:
changed_count += 1
return changed_payloads, changed_count, keepalive_count

View File

@@ -0,0 +1,94 @@
from __future__ import annotations
import argparse
from pathlib import Path
import time
from .engine import OutputSettings, RealtimeEngine
from .mapping_xml import load_mapping
from .models import PatternParams, SceneState
def main() -> None:
parser = argparse.ArgumentParser(description="Run the Infinity_Vis_1 live DDP sender.")
parser.add_argument(
"--mapping",
type=Path,
default=Path(__file__).resolve().parents[2] / "sample_data" / "infinity_mirror_mapping_clean.xml",
help="Path to the XML mapping file.",
)
parser.add_argument("--pattern", type=str, default="solid", help="Pattern id.")
parser.add_argument("--tempo", type=float, default=120.0, help="Tempo in BPM.")
parser.add_argument("--fps", type=float, default=40.0, help="Target output FPS.")
parser.add_argument("--preview-fps", type=float, default=0.0, help="Preview cadence for snapshot generation.")
parser.add_argument("--duration", type=float, default=0.0, help="Optional runtime limit in seconds. 0 means until Ctrl+C.")
parser.add_argument("--primary-color", type=str, default="#4D7CFF", help="Primary color in #RRGGBB.")
parser.add_argument("--secondary-color", type=str, default="#0E1630", help="Secondary color in #RRGGBB.")
parser.add_argument("--palette", type=str, default="Laser Club", help="Palette name.")
parser.add_argument("--brightness", type=float, default=1.0, help="Global pattern brightness 0..1.")
parser.add_argument("--tempo-multiplier", type=float, default=1.0, help="Pattern tempo multiplier.")
parser.add_argument("--color-mode", type=str, default="dual", help="Color mode, for example dual or palette.")
parser.add_argument("--strobe-mode", type=str, default="global", help="Strobe mode.")
parser.add_argument("--keepalive", type=float, default=0.35, help="Keepalive interval for unchanged payloads.")
parser.add_argument("--no-delta", action="store_true", help="Always send every controller every frame.")
args = parser.parse_args()
mapping = load_mapping(args.mapping)
scene = SceneState(
pattern_id=args.pattern,
tempo_bpm=args.tempo,
params=PatternParams(
brightness=max(0.0, min(1.0, args.brightness)),
tempo_multiplier=args.tempo_multiplier,
color_mode=args.color_mode,
primary_color=args.primary_color,
secondary_color=args.secondary_color,
palette=args.palette,
strobe_mode=args.strobe_mode,
),
)
settings = OutputSettings(
output_fps=max(1.0, args.fps),
preview_fps=max(0.0, args.preview_fps),
keepalive_seconds=max(0.0, args.keepalive),
delta_sending=not args.no_delta,
preview_enabled=args.preview_fps > 0.0,
)
engine = RealtimeEngine(mapping, scene=scene, settings=settings)
deadline = time.perf_counter() + args.duration if args.duration > 0 else None
last_report = time.perf_counter()
print(f"Infinity_Vis_1 live sender started")
print(f"Mapping: {mapping.name}")
print(f"Pattern: {scene.pattern_id} | Tempo: {scene.tempo_bpm:.1f} BPM | Output FPS: {settings.output_fps:.1f}")
print(f"Controllers: {engine.diagnostics.controller_count} | Tiles: {engine.diagnostics.tile_count} | LEDs: {engine.diagnostics.total_leds}")
print("Press Ctrl+C to stop.")
try:
while True:
now = time.perf_counter()
if deadline is not None and now >= deadline:
break
engine.send_due(now=now)
if now - last_report >= 1.0:
diagnostics = engine.diagnostics
print(
"output={output_frames} preview={preview_frames} packets={packets_sent} "
"changed={changed} keepalive={keepalive} stale={stale} render={render_ms:.2f} ms".format(
output_frames=diagnostics.output_frames,
preview_frames=diagnostics.preview_frames,
packets_sent=diagnostics.packets_sent,
changed=diagnostics.changed_controller_frames,
keepalive=diagnostics.keepalive_controller_frames,
stale=diagnostics.stale_output_frames,
render_ms=diagnostics.last_render_ms,
)
)
last_report = now
time.sleep(0.001)
except KeyboardInterrupt:
print("Stopped by user.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,263 @@
from __future__ import annotations
from pathlib import Path
import ipaddress
import xml.etree.ElementTree as ET
from .models import MappingSpec, SegmentSpec, TileSpec
class MappingError(ValueError):
pass
def _req(node: ET.Element, key: str) -> str:
value = node.get(key)
if value is None:
raise MappingError(f"Missing required attribute {key!r} on <{node.tag}>.")
return value
def _get_int(node: ET.Element | None, key: str, default: int = 0) -> int:
if node is None:
return default
raw = node.get(key)
if raw is None:
return default
try:
return int(raw)
except ValueError as exc:
raise MappingError(f"Invalid integer {raw!r} for {key!r} on <{node.tag}>.") from exc
def _get_float(node: ET.Element | None, key: str, default: float = 0.0) -> float:
if node is None:
return default
raw = node.get(key)
if raw is None:
return default
try:
return float(raw)
except ValueError as exc:
raise MappingError(f"Invalid float {raw!r} for {key!r} on <{node.tag}>.") from exc
def _get_bool(node: ET.Element | None, key: str, default: bool = False) -> bool:
if node is None:
return default
raw = node.get(key)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def load_mapping(path: str | Path) -> MappingSpec:
xml_text = Path(path).read_text(encoding="utf-8")
mapping = load_mapping_from_string(xml_text)
mapping.file_path = Path(path)
return mapping
def load_mapping_from_string(xml_text: str) -> MappingSpec:
try:
root = ET.fromstring(xml_text)
except ET.ParseError as exc:
raise MappingError(f"XML parse error: {exc}") from exc
if root.tag != "InfinityMirrorConfig":
raise MappingError(f"Unexpected root element <{root.tag}>.")
logical = root.find("LogicalDisplay")
defaults = root.find("Defaults")
tiles_node = root.find("Tiles")
if tiles_node is None:
raise MappingError("Missing <Tiles> node.")
tiles: list[TileSpec] = []
for tile_node in tiles_node.findall("Tile"):
segments_node = tile_node.find("Segments")
segments: list[SegmentSpec] = []
for segment_node in segments_node.findall("Segment") if segments_node is not None else []:
segments.append(
SegmentSpec(
name=segment_node.get("name", ""),
side=segment_node.get("side", ""),
start_channel=_get_int(segment_node, "startChannel", 1),
led_count=_get_int(segment_node, "ledCount", 0),
reverse=_get_bool(segment_node, "reverse", False),
orientation_rad=_get_float(segment_node, "orientationRad", 0.0),
x0=_get_float(segment_node, "x0", 0.0),
y0=_get_float(segment_node, "y0", 0.0),
x1=_get_float(segment_node, "x1", 0.0),
y1=_get_float(segment_node, "y1", 0.0),
)
)
calibration_node = tile_node.find("Calibration")
tiles.append(
TileSpec(
tile_id=_req(tile_node, "id"),
row=_get_int(tile_node, "row", 0),
col=_get_int(tile_node, "col", 0),
led_total=_get_int(tile_node, "ledTotal", 0),
controller_ip=tile_node.get("ip", "").strip(),
screen_name=tile_node.get("screenName", ""),
controller_host=tile_node.get("controllerHost", ""),
controller_name=tile_node.get("controllerName", ""),
controller_mac=tile_node.get("controllerMac", ""),
universe=_get_int(tile_node, "universe", 0),
subnet=_get_int(tile_node, "subnet", 0),
enabled=_get_bool(tile_node, "enabled", True),
brightness=_get_float(calibration_node, "brightness", 1.0),
x0=_get_float(tile_node, "x0", 0.0),
y0=_get_float(tile_node, "y0", 0.0),
x1=_get_float(tile_node, "x1", 0.0),
y1=_get_float(tile_node, "y1", 0.0),
segments=sorted(segments, key=lambda item: (item.start_channel, item.name)),
)
)
mapping = MappingSpec(
name=root.get("name", "Infinity Vis 1"),
rows=_get_int(logical, "rows", 3),
cols=_get_int(logical, "cols", 6),
tiles=tiles,
protocol=defaults.findtext("protocol", default="ddp") if defaults is not None else "ddp",
subnet=int(defaults.findtext("subnet", default="0")) if defaults is not None else 0,
color_format=defaults.findtext("colorFormat", default="rgb") if defaults is not None else "rgb",
tile_behavior=defaults.findtext("tileBehavior", default="solid_color_per_tile")
if defaults is not None
else "solid_color_per_tile",
global_gamma=float(defaults.findtext("globalGamma", default="2.2")) if defaults is not None else 2.2,
)
validate_mapping(mapping)
return mapping
def validate_mapping(mapping: MappingSpec) -> None:
if mapping.rows <= 0 or mapping.cols <= 0:
raise MappingError("Rows and columns must be positive.")
if len(mapping.tiles) != mapping.rows * mapping.cols:
raise MappingError(
f"Expected {mapping.rows * mapping.cols} tiles for a {mapping.rows}x{mapping.cols} grid, found {len(mapping.tiles)}."
)
seen_ids: set[str] = set()
seen_positions: set[tuple[int, int]] = set()
for tile in mapping.tiles:
if tile.tile_id in seen_ids:
raise MappingError(f"Duplicate tile id {tile.tile_id!r}.")
seen_ids.add(tile.tile_id)
pos = (tile.row, tile.col)
if pos in seen_positions:
raise MappingError(f"Duplicate tile position row={tile.row}, col={tile.col}.")
seen_positions.add(pos)
if tile.controller_ip:
try:
ipaddress.ip_address(tile.controller_ip)
except ValueError as exc:
raise MappingError(f"Invalid controller IP {tile.controller_ip!r} on {tile.tile_id}.") from exc
segment_leds = sum(max(0, segment.led_count) for segment in tile.segments)
if tile.segments and segment_leds != tile.led_total:
raise MappingError(
f"{tile.tile_id}: ledTotal={tile.led_total} does not match segment sum {segment_leds}."
)
def save_mapping(mapping: MappingSpec, path: str | Path) -> None:
validate_mapping(mapping)
Path(path).write_text(mapping_to_xml_string(mapping), encoding="utf-8")
def mapping_to_xml_string(mapping: MappingSpec) -> str:
root = ET.Element("InfinityMirrorConfig", {"name": mapping.name, "version": "1.0"})
source = ET.SubElement(root, "Source")
ET.SubElement(source, "OriginalExport").text = "Infinity_Vis_1"
ET.SubElement(source, "DerivedFrom").text = "Infinity_Vis_1"
ET.SubElement(source, "OriginalComposition", {"width": "1200", "height": "600"})
ET.SubElement(
root,
"LogicalDisplay",
{
"rows": str(mapping.rows),
"cols": str(mapping.cols),
"previewWidth": "1200",
"previewHeight": "600",
"tileWidth": "200",
"tileHeight": "200",
},
)
defaults = ET.SubElement(root, "Defaults")
ET.SubElement(defaults, "protocol").text = mapping.protocol
ET.SubElement(defaults, "subnet").text = str(mapping.subnet)
ET.SubElement(defaults, "colorFormat").text = mapping.color_format
ET.SubElement(defaults, "tileBehavior").text = mapping.tile_behavior
ET.SubElement(defaults, "globalGamma").text = _fmt(mapping.global_gamma)
tiles_node = ET.SubElement(root, "Tiles")
for tile in mapping.ordered_tiles():
attrs = {
"id": tile.tile_id,
"row": str(tile.row),
"col": str(tile.col),
"screenName": tile.screen_name,
"ip": tile.controller_ip,
"universe": str(tile.universe),
"subnet": str(tile.subnet),
"ledTotal": str(tile.led_total),
"x0": _fmt(tile.x0),
"y0": _fmt(tile.y0),
"x1": _fmt(tile.x1),
"y1": _fmt(tile.y1),
"enabled": "true" if tile.enabled else "false",
}
if tile.controller_host:
attrs["controllerHost"] = tile.controller_host
if tile.controller_name:
attrs["controllerName"] = tile.controller_name
if tile.controller_mac:
attrs["controllerMac"] = tile.controller_mac
tile_node = ET.SubElement(tiles_node, "Tile", attrs)
ET.SubElement(
tile_node,
"Calibration",
{
"brightness": _fmt(tile.brightness),
"redGain": "1",
"greenGain": "1",
"blueGain": "1",
},
)
segments_node = ET.SubElement(tile_node, "Segments")
for segment in tile.segments:
ET.SubElement(
segments_node,
"Segment",
{
"name": segment.name,
"side": segment.side,
"startChannel": str(segment.start_channel),
"ledCount": str(segment.led_count),
"orientationRad": _fmt(segment.orientation_rad),
"x0": _fmt(segment.x0),
"y0": _fmt(segment.y0),
"x1": _fmt(segment.x1),
"y1": _fmt(segment.y1),
"reverse": "true" if segment.reverse else "false",
},
)
ET.indent(root, space=" ")
return ET.tostring(root, encoding="unicode", xml_declaration=True)
def _fmt(value: float) -> str:
text = f"{float(value):.6f}".rstrip("0").rstrip(".")
return text if text else "0"

View File

@@ -0,0 +1,164 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal
RGB = tuple[int, int, int]
@dataclass(slots=True)
class SegmentSpec:
name: str
side: str
start_channel: int
led_count: int
reverse: bool = False
orientation_rad: float = 0.0
x0: float = 0.0
y0: float = 0.0
x1: float = 0.0
y1: float = 0.0
@property
def byte_offset(self) -> int:
return max(0, self.start_channel - 1)
@dataclass(slots=True)
class TileSpec:
tile_id: str
row: int
col: int
led_total: int
controller_ip: str = ""
screen_name: str = ""
controller_host: str = ""
controller_name: str = ""
controller_mac: str = ""
universe: int = 0
subnet: int = 0
enabled: bool = True
brightness: float = 1.0
x0: float = 0.0
y0: float = 0.0
x1: float = 0.0
y1: float = 0.0
segments: list[SegmentSpec] = field(default_factory=list)
@property
def controller_payload_bytes(self) -> int:
return max(0, self.led_total) * 3
@dataclass(slots=True)
class ControllerTileRoute:
tile_id: str
controller_offset_bytes: int
segment_offsets: tuple[tuple[int, int], ...]
@dataclass(slots=True)
class ControllerSpec:
host: str
payload_bytes: int
tiles: tuple[ControllerTileRoute, ...]
@dataclass(slots=True)
class MappingSpec:
name: str = "Infinity Vis 1"
rows: int = 3
cols: int = 6
tiles: list[TileSpec] = field(default_factory=list)
file_path: Path | None = None
protocol: str = "ddp"
subnet: int = 0
color_format: str = "rgb"
tile_behavior: str = "solid_color_per_tile"
global_gamma: float = 2.2
def ordered_tiles(self) -> list[TileSpec]:
return sorted(self.tiles, key=lambda tile: (tile.row, tile.col, tile.tile_id))
def tile_lookup(self) -> dict[str, TileSpec]:
return {tile.tile_id: tile for tile in self.tiles}
def build_controllers(self) -> list[ControllerSpec]:
groups: dict[str, list[TileSpec]] = {}
for tile in self.ordered_tiles():
if not tile.enabled:
continue
host = tile.controller_ip.strip()
if not host:
continue
groups.setdefault(host, []).append(tile)
controllers: list[ControllerSpec] = []
for host, tiles in groups.items():
running_offset = 0
routes: list[ControllerTileRoute] = []
for tile in tiles:
segment_offsets = tuple(
(running_offset + segment.byte_offset, segment.led_count)
for segment in sorted(tile.segments, key=lambda item: (item.start_channel, item.name))
)
routes.append(
ControllerTileRoute(
tile_id=tile.tile_id,
controller_offset_bytes=running_offset,
segment_offsets=segment_offsets,
)
)
running_offset += tile.controller_payload_bytes
controllers.append(
ControllerSpec(
host=host,
payload_bytes=running_offset,
tiles=tuple(routes),
)
)
return sorted(controllers, key=lambda item: item.host)
@dataclass(slots=True)
class PatternParams:
brightness: float = 1.0
tempo_multiplier: float = 1.0
direction: str = "left_to_right"
checker_mode: str = "classic"
scan_style: str = "line"
angle: float = 0.0
on_width: float = 1.0
off_width: float = 1.0
strobe_mode: str = "global"
stopwatch_mode: str = "sync"
color_mode: str = "dual"
primary_color: str = "#4D7CFF"
secondary_color: str = "#0E1630"
palette: str = "Laser Club"
symmetry: str = "none"
center_pulse_mode: str = "expand"
step_size: float = 1.0
block_size: float = 1.0
pixel_group_size: float = 1.0
strobe_duty_cycle: float = 0.5
randomness: float = 0.35
@dataclass(slots=True)
class SceneState:
pattern_id: str = "solid"
params: PatternParams = field(default_factory=PatternParams)
tempo_bpm: float = 120.0
utility_mode: Literal["none", "blackout", "single_tile", "identify"] = "none"
selected_tile_id: str | None = None
@dataclass(slots=True)
class FrameSnapshot:
timestamp: float
pattern_id: str
tile_colors: list[RGB]
controller_payloads: dict[str, bytes]

View File

@@ -0,0 +1 @@
"""Output backends for Infinity_Vis_1."""

View File

@@ -0,0 +1,56 @@
from __future__ import annotations
import socket
import struct
DDP_DEFAULT_PORT = 4048
DDP_MAX_DATA_LENGTH = 1440
DDP_VERSION_1 = 0x40
DDP_PUSH_FLAG = 0x01
DDP_RGB888 = 0x0B
DDP_DESTINATION_ID = 1
class DDPClient:
def __init__(self, port: int = DDP_DEFAULT_PORT) -> None:
self.port = int(port)
self._sequence = 1
def count_packets(self, host_payloads: dict[str, bytes]) -> int:
packet_count = 0
for payload in host_payloads.values():
chunks, remainder = divmod(len(payload), DDP_MAX_DATA_LENGTH)
packet_count += chunks + (1 if remainder else 0)
return packet_count
def build_packets(self, host_payloads: dict[str, bytes]) -> list[tuple[str, bytes]]:
packets: list[tuple[str, bytes]] = []
sequence = self._next_sequence()
for host, payload in host_payloads.items():
for offset in range(0, len(payload), DDP_MAX_DATA_LENGTH):
chunk = payload[offset : offset + DDP_MAX_DATA_LENGTH]
last = offset + DDP_MAX_DATA_LENGTH >= len(payload)
header = struct.pack(
"!BBBBLH",
DDP_VERSION_1 | (DDP_PUSH_FLAG if last else 0),
sequence,
DDP_RGB888,
DDP_DESTINATION_ID,
offset,
len(chunk),
)
packets.append((host, header + chunk))
return packets
def send(self, host_payloads: dict[str, bytes]) -> int:
packets = self.build_packets(host_payloads)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
for host, packet in packets:
sock.sendto(packet, (host, self.port))
return len(packets)
def _next_sequence(self) -> int:
sequence = self._sequence
self._sequence = 1 if self._sequence >= 15 else self._sequence + 1
return sequence

View File

@@ -0,0 +1,257 @@
from __future__ import annotations
import math
import random
from dataclasses import dataclass
from .colors import mix, palette_color, rgb_from_hex, scale
from .models import MappingSpec, PatternParams, RGB
@dataclass(slots=True)
class TilePoint:
row: int
col: int
u: float
v: float
index: int
@property
def tile_key(self) -> str:
return f"{self.row}:{self.col}"
def pattern_ids() -> list[str]:
return sorted(PATTERN_REGISTRY)
def render_pattern(mapping: MappingSpec, pattern_id: str, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
points = _tile_points(mapping)
renderer = PATTERN_REGISTRY.get(pattern_id, _render_solid)
return renderer(points, mapping, params, time_s, tempo_bpm)
def utility_pattern(mapping: MappingSpec, utility_mode: str, selected_tile_id: str | None, time_s: float) -> list[RGB]:
points = _tile_points(mapping)
ordered = mapping.ordered_tiles()
if utility_mode == "blackout":
return [(0, 0, 0) for _ in points]
if utility_mode == "identify":
flash = _wave(time_s * 8.0)
color = (255, 255, 255) if flash > 0.5 else (0, 0, 0)
return [color for _ in points]
if utility_mode == "single_tile":
return [(255, 255, 255) if ordered[index].tile_id == selected_tile_id else (0, 0, 0) for index, _point in enumerate(points)]
return [(0, 0, 0) for _ in points]
def _tile_points(mapping: MappingSpec) -> list[TilePoint]:
rows = max(1, mapping.rows - 1)
cols = max(1, mapping.cols - 1)
points: list[TilePoint] = []
for index, tile in enumerate(mapping.ordered_tiles()):
points.append(
TilePoint(
row=tile.row,
col=tile.col,
u=(tile.col - 1) / cols,
v=(tile.row - 1) / rows,
index=index,
)
)
return points
def _primary_secondary(params: PatternParams, phase: float = 0.5) -> tuple[RGB, RGB]:
if params.color_mode == "palette":
return palette_color(params.palette, phase), palette_color(params.palette, (phase + 0.33) % 1.0)
return rgb_from_hex(params.primary_color), rgb_from_hex(params.secondary_color)
def _wave(value: float) -> float:
return 0.5 + 0.5 * math.sin(value)
def _axis_projection(point: TilePoint, angle_deg: float) -> float:
angle = math.radians(angle_deg % 360.0)
x = point.u - 0.5
y = point.v - 0.5
return (x * math.cos(angle)) + (y * math.sin(angle))
def _direction_progress(point: TilePoint, direction: str) -> float:
if direction == "right_to_left":
return 1.0 - point.u
if direction == "bottom_to_top":
return 1.0 - point.v
if direction == "top_to_bottom":
return point.v
return point.u
def _render_solid(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
primary, _secondary = _primary_secondary(params, 0.55)
return [scale(primary, params.brightness) for _ in points]
def _render_checker(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
return [scale(a if (point.row + point.col) % 2 == 0 else b, params.brightness) for point in points]
def _render_row_gradient(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
return [scale(mix(a, b, point.v), params.brightness) for point in points]
def _render_column_gradient(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
return [scale(mix(a, b, point.u), params.brightness) for point in points]
def _render_breathing(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, _b = _primary_secondary(params, 0.4)
pulse = 0.15 + 0.85 * _wave(time_s * (tempo_bpm / 60.0) * params.tempo_multiplier)
return [scale(a, params.brightness * pulse) for _ in points]
def _render_center_pulse(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
pulse = _wave(time_s * (tempo_bpm / 60.0) * params.tempo_multiplier)
colors: list[RGB] = []
for point in points:
dx = point.u - 0.5
dy = point.v - 0.5
dist = math.sqrt(dx * dx + dy * dy) * 1.5
strength = max(0.0, 1.0 - abs(dist - pulse))
colors.append(scale(mix(b, a, strength), params.brightness))
return colors
def _render_sparkle(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
rng = random.Random(int(time_s * 120.0))
return [scale(a if rng.random() < (0.15 + params.randomness * 0.35) else b, params.brightness) for _ in points]
def _render_wave_line(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
phase = time_s * (tempo_bpm / 60.0) * params.tempo_multiplier
colors: list[RGB] = []
for point in points:
value = _wave((point.u * math.pi * 4.0) + phase * math.pi * 2.0 + point.v * 1.2)
colors.append(scale(mix(b, a, value), params.brightness))
return colors
def _render_scan(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
position = ((time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0) * 2.0 - 1.0
band = max(0.08, min(1.2, params.on_width * 0.35))
colors: list[RGB] = []
for point in points:
projection = _axis_projection(point, params.angle)
active = abs(projection - position) <= band
colors.append(scale(a if active else b, params.brightness))
return colors
def _render_arrow(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
head = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
colors: list[RGB] = []
for point in points:
progress = _direction_progress(point, params.direction)
width = 0.15 + 0.25 * (1.0 - abs(point.v - 0.5) * 2.0)
colors.append(scale(a if abs(progress - head) < width else b, params.brightness))
return colors
def _render_scan_dual(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
progress = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
colors: list[RGB] = []
for point in points:
axis = _direction_progress(point, params.direction)
active = abs(axis - progress) < 0.2 or abs(axis - (1.0 - progress)) < 0.2
colors.append(scale(a if active else b, params.brightness))
return colors
def _render_sweep(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
progress = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
return [scale(a if _direction_progress(point, params.direction) <= progress else b, params.brightness) for point in points]
def _render_saw(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
shift = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
return [scale(mix(b, a, (_direction_progress(point, params.direction) + shift) % 1.0), params.brightness) for point in points]
def _render_two_dots(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
p1 = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
p2 = (p1 + 0.5) % 1.0
colors: list[RGB] = []
for point in points:
axis = _direction_progress(point, params.direction)
active = abs(axis - p1) < 0.12 or abs(axis - p2) < 0.12
colors.append(scale(a if active else b, params.brightness))
return colors
def _render_strobe(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
phase = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
on = phase < max(0.02, min(0.98, params.strobe_duty_cycle))
if params.strobe_mode == "global":
return [scale(a if on else b, params.brightness) for _ in points]
rng = random.Random(int(time_s * 80.0))
result: list[RGB] = []
for point in points:
local_on = on
if params.strobe_mode in {"random_pixels", "random_leds"}:
local_on = rng.random() > 0.5
elif params.strobe_mode == "checker":
local_on = on and ((point.row + point.col) % 2 == 0)
result.append(scale(a if local_on else b, params.brightness))
return result
def _render_stopwatch(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
highlight = int(time_s) % max(1, len(points))
return [scale(a if point.index == highlight else b, params.brightness) for point in points]
def _render_snake(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
a, b = _primary_secondary(params)
order = sorted(points, key=lambda point: (point.row, point.col if point.row % 2 else -point.col))
head = int(time_s * (tempo_bpm / 60.0) * params.tempo_multiplier * max(1.0, params.step_size)) % len(order)
tail = max(1, int(params.block_size))
active = {order[(head - offset) % len(order)].tile_key for offset in range(tail)}
return [scale(a if point.tile_key in active else b, params.brightness) for point in points]
PATTERN_REGISTRY = {
"arrow": _render_arrow,
"breathing": _render_breathing,
"center_pulse": _render_center_pulse,
"checker": _render_checker,
"column_gradient": _render_column_gradient,
"row_gradient": _render_row_gradient,
"saw": _render_saw,
"scan": _render_scan,
"scan_dual": _render_scan_dual,
"snake": _render_snake,
"solid": _render_solid,
"sparkle": _render_sparkle,
"stopwatch": _render_stopwatch,
"strobe": _render_strobe,
"sweep": _render_sweep,
"two_dots": _render_two_dots,
"wave_line": _render_wave_line,
}

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
from dataclasses import asdict, dataclass
from datetime import datetime
from pathlib import Path
import json
import re
from .models import PatternParams, SceneState
@dataclass(slots=True)
class PresetRecord:
name: str
pattern_id: str
tempo_bpm: float
params: dict
created_at: str
@classmethod
def from_scene(cls, name: str, scene: SceneState) -> "PresetRecord":
return cls(
name=name,
pattern_id=scene.pattern_id,
tempo_bpm=scene.tempo_bpm,
params=asdict(scene.params),
created_at=datetime.utcnow().isoformat(timespec="seconds"),
)
class PresetStore:
def __init__(self, root: str | Path) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
def save(self, record: PresetRecord) -> Path:
path = self.root / f"{slugify(record.name)}.json"
path.write_text(json.dumps(asdict(record), indent=2), encoding="utf-8")
return path
def list(self) -> list[PresetRecord]:
presets: list[PresetRecord] = []
for path in sorted(self.root.glob("*.json")):
try:
payload = json.loads(path.read_text(encoding="utf-8"))
presets.append(PresetRecord(**payload))
except (OSError, json.JSONDecodeError, TypeError):
continue
return presets
def load_scene(self, name: str) -> SceneState:
path = self.root / f"{slugify(name)}.json"
payload = json.loads(path.read_text(encoding="utf-8"))
record = PresetRecord(**payload)
return SceneState(
pattern_id=record.pattern_id,
tempo_bpm=float(record.tempo_bpm),
params=PatternParams(**record.params),
)
def slugify(value: str) -> str:
return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") or "preset"

View File

@@ -0,0 +1,90 @@
from __future__ import annotations
from dataclasses import dataclass
import time
from .colors import scale
from .models import ControllerSpec, FrameSnapshot, MappingSpec, RGB, SceneState
from .patterns import render_pattern, utility_pattern
@dataclass(slots=True)
class RendererStats:
tile_count: int
controller_count: int
total_leds: int
class FastRenderer:
def __init__(self, mapping: MappingSpec) -> None:
self.mapping = mapping
self.tiles = mapping.ordered_tiles()
self.controllers: list[ControllerSpec] = mapping.build_controllers()
self._tile_index = {tile.tile_id: index for index, tile in enumerate(self.tiles)}
self._controller_buffers = {controller.host: bytearray(controller.payload_bytes) for controller in self.controllers}
self._controller_zero = {host: bytes(len(buffer)) for host, buffer in self._controller_buffers.items()}
self._gamma_table = self._build_gamma_table(mapping.global_gamma)
def stats(self) -> RendererStats:
return RendererStats(
tile_count=len(self.tiles),
controller_count=len(self.controllers),
total_leds=sum(tile.led_total for tile in self.tiles),
)
def render(self, scene: SceneState, timestamp: float | None = None) -> FrameSnapshot:
now = time.perf_counter() if timestamp is None else float(timestamp)
if scene.utility_mode != "none":
tile_colors = utility_pattern(self.mapping, scene.utility_mode, scene.selected_tile_id, now)
else:
tile_colors = render_pattern(self.mapping, scene.pattern_id, scene.params, now, scene.tempo_bpm)
controller_payloads = self._build_controller_payloads(tile_colors)
return FrameSnapshot(
timestamp=now,
pattern_id=scene.pattern_id,
tile_colors=tile_colors,
controller_payloads=controller_payloads,
)
def _build_controller_payloads(self, tile_colors: list[RGB]) -> dict[str, bytes]:
for host, buffer in self._controller_buffers.items():
buffer[:] = self._controller_zero[host]
for controller in self.controllers:
payload = self._controller_buffers[controller.host]
for route in controller.tiles:
tile = self.tiles[self._tile_index[route.tile_id]]
color = tile_colors[self._tile_index[route.tile_id]]
color = self._apply_tile_brightness(color, tile.brightness)
for byte_offset, led_count in route.segment_offsets:
self._fill_segment(payload, byte_offset, led_count, color)
return {host: bytes(buffer) for host, buffer in self._controller_buffers.items()}
def _apply_tile_brightness(self, color: RGB, brightness: float) -> RGB:
scaled = scale(color, brightness)
return (
self._gamma_table[scaled[0]],
self._gamma_table[scaled[1]],
self._gamma_table[scaled[2]],
)
def _build_gamma_table(self, gamma: float) -> list[int]:
gamma = max(0.1, float(gamma))
table = [0] * 256
for value in range(256):
normalized = value / 255.0
table[value] = max(0, min(255, int(round((normalized ** gamma) * 255.0))))
return table
def _fill_segment(self, payload: bytearray, byte_offset: int, led_count: int, color: RGB) -> None:
red, green, blue = color
cursor = byte_offset
for _ in range(max(0, led_count)):
if cursor + 2 >= len(payload):
break
payload[cursor] = red
payload[cursor + 1] = green
payload[cursor + 2] = blue
cursor += 3

View File

@@ -0,0 +1,18 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "infinity-vis-1"
version = "0.1.0"
description = "Fresh performance-oriented rebuild of Infinity Vis."
readme = "README.md"
requires-python = ">=3.10"
dependencies = []
[project.scripts]
infinity-vis-1-live = "infinity_vis_1.live:main"
infinity-vis-1-benchmark = "infinity_vis_1.benchmark:main"
[tool.setuptools.packages.find]
include = ["infinity_vis_1*"]

View File

@@ -0,0 +1,100 @@
import unittest
from Infinity_Vis_1.infinity_vis_1.engine import OutputSettings, RealtimeEngine
from Infinity_Vis_1.infinity_vis_1.models import MappingSpec, PatternParams, SceneState, SegmentSpec, TileSpec
class RecordingDDPClient:
def __init__(self) -> None:
self.sent_payloads: list[dict[str, bytes]] = []
def count_packets(self, host_payloads: dict[str, bytes]) -> int:
return len(host_payloads)
def send(self, host_payloads: dict[str, bytes]) -> int:
self.sent_payloads.append(dict(host_payloads))
return self.count_packets(host_payloads)
class EngineTests(unittest.TestCase):
def _mapping(self) -> MappingSpec:
return MappingSpec(
name="engine-test",
rows=1,
cols=2,
tiles=[
TileSpec(
tile_id="r1c1",
row=1,
col=1,
led_total=2,
controller_ip="192.168.0.10",
segments=[SegmentSpec(name="all", side="top", start_channel=1, led_count=2)],
),
TileSpec(
tile_id="r1c2",
row=1,
col=2,
led_total=2,
controller_ip="192.168.0.10",
segments=[SegmentSpec(name="all", side="top", start_channel=1, led_count=2)],
),
],
)
def test_preview_rate_is_decoupled_from_output_rate(self) -> None:
engine = RealtimeEngine(
self._mapping(),
scene=SceneState(pattern_id="solid", params=PatternParams(primary_color="#FF0000")),
settings=OutputSettings(output_fps=40.0, preview_fps=10.0, delta_sending=False),
ddp_client=RecordingDDPClient(),
)
first = engine.tick(now=1.0)
second = engine.tick(now=1.03)
third = engine.tick(now=1.11)
self.assertTrue(first.output_due)
self.assertTrue(first.preview_due)
self.assertTrue(second.output_due)
self.assertFalse(second.preview_due)
self.assertTrue(third.output_due)
self.assertTrue(third.preview_due)
self.assertEqual(engine.diagnostics.output_frames, 3)
self.assertEqual(engine.diagnostics.preview_frames, 2)
def test_delta_sending_skips_unchanged_frames_but_keeps_keepalive(self) -> None:
engine = RealtimeEngine(
self._mapping(),
scene=SceneState(pattern_id="solid", params=PatternParams(primary_color="#00FF00")),
settings=OutputSettings(output_fps=20.0, preview_fps=0.0, delta_sending=True, keepalive_seconds=0.35),
ddp_client=RecordingDDPClient(),
)
first = engine.tick(now=1.00)
second = engine.tick(now=1.05)
third = engine.tick(now=1.41)
self.assertEqual(list(first.changed_payloads), ["192.168.0.10"])
self.assertEqual(second.changed_payloads, {})
self.assertEqual(list(third.changed_payloads), ["192.168.0.10"])
self.assertEqual(engine.diagnostics.stale_output_frames, 1)
self.assertEqual(engine.diagnostics.keepalive_controller_frames, 1)
def test_send_due_uses_client_without_network(self) -> None:
client = RecordingDDPClient()
engine = RealtimeEngine(
self._mapping(),
scene=SceneState(pattern_id="solid", params=PatternParams(primary_color="#112233")),
settings=OutputSettings(output_fps=30.0, preview_fps=0.0, delta_sending=False),
ddp_client=client,
)
result = engine.send_due(now=2.0)
self.assertEqual(result.packet_count, 1)
self.assertEqual(len(client.sent_payloads), 1)
self.assertEqual(engine.diagnostics.packets_sent, 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,27 @@
from pathlib import Path
import unittest
from Infinity_Vis_1.infinity_vis_1.mapping_xml import load_mapping, load_mapping_from_string, mapping_to_xml_string
class MappingXmlTests(unittest.TestCase):
def test_load_sample_mapping(self) -> None:
path = Path(__file__).resolve().parents[2] / "sample_data" / "infinity_mirror_mapping_clean.xml"
mapping = load_mapping(path)
self.assertEqual(mapping.rows, 3)
self.assertEqual(mapping.cols, 6)
self.assertEqual(len(mapping.tiles), 18)
self.assertEqual(mapping.tiles[0].led_total, 106)
def test_round_trip_preserves_tile_count(self) -> None:
path = Path(__file__).resolve().parents[2] / "sample_data" / "infinity_mirror_mapping_clean.xml"
mapping = load_mapping(path)
xml_text = mapping_to_xml_string(mapping)
restored = load_mapping_from_string(xml_text)
self.assertEqual(len(restored.tiles), len(mapping.tiles))
self.assertEqual(restored.rows, mapping.rows)
self.assertEqual(restored.cols, mapping.cols)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,48 @@
import unittest
from Infinity_Vis_1.infinity_vis_1.models import MappingSpec, PatternParams, SceneState, SegmentSpec, TileSpec
from Infinity_Vis_1.infinity_vis_1.renderer import FastRenderer
class RendererTests(unittest.TestCase):
def _mapping(self) -> MappingSpec:
return MappingSpec(
name="test",
rows=1,
cols=2,
tiles=[
TileSpec(
tile_id="r1c1",
row=1,
col=1,
led_total=2,
controller_ip="192.168.0.10",
segments=[SegmentSpec(name="all", side="top", start_channel=1, led_count=2)],
),
TileSpec(
tile_id="r1c2",
row=1,
col=2,
led_total=2,
controller_ip="192.168.0.10",
segments=[SegmentSpec(name="all", side="top", start_channel=1, led_count=2)],
),
],
)
def test_shared_ip_tiles_produce_one_controller_payload(self) -> None:
renderer = FastRenderer(self._mapping())
scene = SceneState(pattern_id="solid", params=PatternParams(primary_color="#FF0000"))
frame = renderer.render(scene, timestamp=1.0)
self.assertEqual(list(frame.controller_payloads), ["192.168.0.10"])
self.assertEqual(len(frame.controller_payloads["192.168.0.10"]), 12)
def test_blackout_utility_zeroes_payload(self) -> None:
renderer = FastRenderer(self._mapping())
scene = SceneState(pattern_id="solid", utility_mode="blackout")
frame = renderer.render(scene, timestamp=1.0)
self.assertEqual(frame.controller_payloads["192.168.0.10"], bytes(12))
if __name__ == "__main__":
unittest.main()