Compare commits
1 Commits
main
...
feature/in
| Author | SHA1 | Date | |
|---|---|---|---|
| 3eb702a762 |
44
Infinity_Vis_1/ARCHITECTURE.md
Normal file
44
Infinity_Vis_1/ARCHITECTURE.md
Normal file
@@ -0,0 +1,44 @@
|
||||
# Infinity_Vis_1 Architecture
|
||||
|
||||
## Design goal
|
||||
|
||||
Make the render path cheaper than the current application by replacing
|
||||
object-heavy frame construction with a compact, controller-oriented runtime.
|
||||
|
||||
## High-level layers
|
||||
|
||||
1. `mapping_xml.py`
|
||||
Parses and writes the existing XML mapping format.
|
||||
2. `models.py`
|
||||
Small `slots=True` dataclasses for tiles, segments, controllers, scenes, and frames.
|
||||
3. `patterns.py`
|
||||
Stateless pattern functions that return tile colors only.
|
||||
4. `renderer.py`
|
||||
Owns precomputed controller routes and reusable `bytearray` payloads.
|
||||
5. `engine.py`
|
||||
Separates output cadence from preview cadence and handles delta sending.
|
||||
6. `output/ddp.py`
|
||||
Sends already-built payloads as DDP packets.
|
||||
7. `presets.py`
|
||||
JSON preset persistence.
|
||||
8. `benchmark.py`
|
||||
Simple measurable entry point for render throughput.
|
||||
|
||||
## Main performance choices
|
||||
|
||||
- Render tile colors first, not rich nested preview objects.
|
||||
- Reuse controller payload buffers every frame.
|
||||
- Precompute segment byte offsets once at startup.
|
||||
- Keep the runtime pure-stdlib for the new core.
|
||||
- Keep preview as a consumer of a compact frame snapshot, not as the center of the engine.
|
||||
- Decouple preview timing from hardware output timing so UI cost cannot directly cap live output.
|
||||
- Skip unchanged controller payloads while still sending periodic keepalives.
|
||||
|
||||
## What is intentionally not in phase 1
|
||||
|
||||
- Full Qt UI parity
|
||||
- Full preview painter parity
|
||||
- WLED discovery UI port
|
||||
- Advanced transition compositing parity
|
||||
|
||||
Those belong in phase 2, after the faster runtime is proven and benchmarked.
|
||||
124
Infinity_Vis_1/FEATURE_INVENTORY.md
Normal file
124
Infinity_Vis_1/FEATURE_INVENTORY.md
Normal file
@@ -0,0 +1,124 @@
|
||||
# Current Feature Inventory
|
||||
|
||||
This inventory was collected from the current `app/` codebase and is the scope
|
||||
target for the clean rebuild.
|
||||
|
||||
## Core control flow
|
||||
|
||||
- Mapping/project open, save, and save-as
|
||||
- Startup auto-load of sample mapping
|
||||
- Live scene and Next scene
|
||||
- FOH mode with `Go` and `Fade Go`
|
||||
- Tempo transport and transition time control
|
||||
- Utility modes such as blackout and single-tile white test
|
||||
|
||||
## Pattern system
|
||||
|
||||
Current built-in patterns:
|
||||
|
||||
- `solid`
|
||||
- `checker`
|
||||
- `row_gradient`
|
||||
- `column_gradient`
|
||||
- `center_pulse`
|
||||
- `sparkle`
|
||||
- `breathing`
|
||||
- `wave_line`
|
||||
- `scan`
|
||||
- `arrow`
|
||||
- `scan_dual`
|
||||
- `sweep`
|
||||
- `saw`
|
||||
- `two_dots`
|
||||
- `strobe`
|
||||
- `stopwatch`
|
||||
- `snake`
|
||||
|
||||
Pattern parameter surface in the current app includes:
|
||||
|
||||
- brightness
|
||||
- fade / smoothing
|
||||
- tempo multiplier
|
||||
- direction
|
||||
- checker mode
|
||||
- scan style and angle
|
||||
- on/off width
|
||||
- band thickness
|
||||
- flip horizontal / vertical
|
||||
- strobe mode and duty cycle
|
||||
- stopwatch mode
|
||||
- color mode
|
||||
- primary / secondary colors
|
||||
- palette
|
||||
- symmetry
|
||||
- center pulse mode
|
||||
- step size
|
||||
- block size
|
||||
- pixel group size
|
||||
- randomness
|
||||
|
||||
## Preview and UI
|
||||
|
||||
- Main desktop Qt window
|
||||
- Pattern panel
|
||||
- Preset browser
|
||||
- Tile preview / technical preview / LEDs-only preview
|
||||
- Fullscreen preview window
|
||||
- Selected tile panel
|
||||
- Utility controls
|
||||
- Output diagnostics panel
|
||||
|
||||
## Mapping and editing
|
||||
|
||||
- XML mapping load/save/validation
|
||||
- Tile table editing
|
||||
- Segment table editing
|
||||
- Raw XML editor
|
||||
- Calibration and enable flags per tile
|
||||
- Controller IP / host / name / MAC metadata persistence
|
||||
|
||||
## WLED-assisted mapping
|
||||
|
||||
- Network scan for WLED devices
|
||||
- Device discovery via `/json/info` and `/json`
|
||||
- Per-device identify action
|
||||
- Tile assignment workflow inside Mapping Settings
|
||||
- Immediate persistence back to the mapping file
|
||||
|
||||
## Presets
|
||||
|
||||
- JSON-backed preset storage
|
||||
- Seed presets on first run
|
||||
- Save, load, and delete presets
|
||||
|
||||
## Output
|
||||
|
||||
- Preview backend
|
||||
- DDP backend for WLED
|
||||
- Art-Net backend seam
|
||||
- Output enable/disable
|
||||
- Output FPS target
|
||||
- Render FPS / Send FPS / Output health diagnostics
|
||||
- Controller-side FPS polling for WLED live mode
|
||||
|
||||
## Performance pain points in the current code
|
||||
|
||||
- A lot of per-frame object churn:
|
||||
- `PreviewFrame`
|
||||
- `TileFrame`
|
||||
- `TilePatternSample`
|
||||
- many `RGBColor` instances
|
||||
- nested `dict[str, list[RGBColor]]` payloads
|
||||
- Per-frame rebuilding of LED pixel lists for each segment
|
||||
- Preview and output coordination still centered around one controller loop
|
||||
- Multiple render engines and transition helpers duplicating state
|
||||
- DDP payload generation starts from nested Python objects instead of a compact frame buffer
|
||||
|
||||
## Rebuild priorities for Infinity_Vis_1
|
||||
|
||||
- fixed-size runtime state
|
||||
- lower per-frame allocation
|
||||
- preview cadence separated from hardware output cadence
|
||||
- controller-first payload generation
|
||||
- shared-IP multi-tile controller topology as a first-class concept
|
||||
- clean feature layering so UI, preview, render, and output can scale independently
|
||||
67
Infinity_Vis_1/MIGRATION_STATUS.md
Normal file
67
Infinity_Vis_1/MIGRATION_STATUS.md
Normal file
@@ -0,0 +1,67 @@
|
||||
# Infinity_Vis_1 Migration Status
|
||||
|
||||
This file tracks the legacy feature surface against the clean rebuild so we can
|
||||
rebuild deliberately instead of drifting into another tangled codebase.
|
||||
|
||||
## Core runtime
|
||||
|
||||
| Feature | Legacy app | Infinity_Vis_1 |
|
||||
| --- | --- | --- |
|
||||
| XML mapping load/save | Yes | Yes |
|
||||
| Mapping validation | Yes | Yes |
|
||||
| Shared-IP controller grouping | Yes | Yes |
|
||||
| Tile-color pattern render path | Yes | Yes |
|
||||
| Direct DDP payload generation | Yes | Yes |
|
||||
| Output/preview timing split | Partial | Yes |
|
||||
| Delta sending with keepalive | Partial | Yes |
|
||||
| Benchmark tooling | Partial | Yes |
|
||||
|
||||
## Patterns
|
||||
|
||||
| Pattern | Legacy app | Infinity_Vis_1 |
|
||||
| --- | --- | --- |
|
||||
| solid | Yes | Yes |
|
||||
| checker | Yes | Yes |
|
||||
| row_gradient | Yes | Yes |
|
||||
| column_gradient | Yes | Yes |
|
||||
| center_pulse | Yes | Yes |
|
||||
| sparkle | Yes | Yes |
|
||||
| breathing | Yes | Yes |
|
||||
| wave_line | Yes | Yes |
|
||||
| scan | Yes | Yes |
|
||||
| arrow | Yes | Yes |
|
||||
| scan_dual | Yes | Yes |
|
||||
| sweep | Yes | Yes |
|
||||
| saw | Yes | Yes |
|
||||
| two_dots | Yes | Yes |
|
||||
| strobe | Yes | Yes |
|
||||
| stopwatch | Yes | Yes |
|
||||
| snake | Yes | Yes |
|
||||
|
||||
## Presets and project data
|
||||
|
||||
| Feature | Legacy app | Infinity_Vis_1 |
|
||||
| --- | --- | --- |
|
||||
| JSON preset storage | Yes | Yes |
|
||||
| Project scene state | Yes | Yes |
|
||||
| Controller metadata persistence | Yes | Yes |
|
||||
| WLED mapping metadata in XML | Yes | Yes |
|
||||
|
||||
## UI and operator workflow
|
||||
|
||||
| Feature | Legacy app | Infinity_Vis_1 |
|
||||
| --- | --- | --- |
|
||||
| Desktop Qt main window | Yes | Not yet |
|
||||
| Full preview painter | Yes | Not yet |
|
||||
| FOH Go/Fade Go workflow | Yes | Not yet |
|
||||
| Preset browser UI | Yes | Not yet |
|
||||
| Selected tile tools | Yes | Not yet |
|
||||
| Mapping Settings dialog | Yes | Not yet |
|
||||
| WLED assisted mapping UI | Yes | Not yet |
|
||||
| Output diagnostics panel | Yes | Not yet |
|
||||
|
||||
## Phase order
|
||||
|
||||
1. Prove the faster core with tests and benchmarks.
|
||||
2. Rebuild scene transport and operator workflows on top of the new engine.
|
||||
3. Rebuild the desktop UI as a thin shell over the new runtime instead of the old state-heavy controller loop.
|
||||
72
Infinity_Vis_1/README.md
Normal file
72
Infinity_Vis_1/README.md
Normal file
@@ -0,0 +1,72 @@
|
||||
# Infinity_Vis_1
|
||||
|
||||
Fresh restart of Infinity Vis with a smaller runtime surface and a lower-allocation render path.
|
||||
|
||||
This directory is intentionally separate from the legacy `app/` implementation.
|
||||
The goal is to keep the old tool usable while rebuilding the next version on a
|
||||
clean architecture.
|
||||
|
||||
## What is in here
|
||||
|
||||
- `FEATURE_INVENTORY.md`
|
||||
Current feature surface collected from the existing application.
|
||||
- `ARCHITECTURE.md`
|
||||
The new runtime design and the main performance goals.
|
||||
- `MIGRATION_STATUS.md`
|
||||
Honest parity tracking between the legacy app and the clean rebuild.
|
||||
- `infinity_vis_1/`
|
||||
New core package with mapping I/O, patterns, fast renderer, presets, and DDP output.
|
||||
- `tests/`
|
||||
Basic verification for the new mapping and renderer core.
|
||||
|
||||
## Current focus
|
||||
|
||||
Phase 1 is the performance-oriented core, not the full Qt feature-parity UI yet.
|
||||
|
||||
That means this new codebase already covers:
|
||||
|
||||
- loading and saving the existing XML mapping format
|
||||
- controller grouping for shared-IP multi-tile nodes
|
||||
- pattern rendering with a low-allocation tile-color pipeline
|
||||
- separate output and preview timing via a new runtime engine
|
||||
- direct DDP payload generation
|
||||
- delta sending with keepalive support for unchanged controller frames
|
||||
- preset persistence
|
||||
- benchmark tooling
|
||||
|
||||
## Run the benchmark
|
||||
|
||||
From the repo root:
|
||||
|
||||
```powershell
|
||||
python -m Infinity_Vis_1.infinity_vis_1.benchmark --frames 600
|
||||
python -m Infinity_Vis_1.infinity_vis_1.benchmark --mode engine --frames 600 --pattern solid
|
||||
```
|
||||
|
||||
## Start the live DDP sender
|
||||
|
||||
```powershell
|
||||
python -m Infinity_Vis_1.infinity_vis_1.live --mapping sample_data\infinity_mirror_mapping_clean.xml --pattern solid --fps 40
|
||||
```
|
||||
|
||||
Example for a harder live test:
|
||||
|
||||
```powershell
|
||||
python -m Infinity_Vis_1.infinity_vis_1.live --mapping sample_data\infinity_mirror_mapping_clean.xml --pattern strobe --fps 40 --strobe-mode global --no-delta
|
||||
```
|
||||
|
||||
## Use the runtime core
|
||||
|
||||
```python
|
||||
from Infinity_Vis_1.infinity_vis_1 import OutputSettings, RealtimeEngine, SceneState, load_mapping
|
||||
|
||||
mapping = load_mapping("sample_data/infinity_mirror_mapping_clean.xml")
|
||||
engine = RealtimeEngine(mapping, scene=SceneState(), settings=OutputSettings(output_fps=40, preview_fps=12))
|
||||
result = engine.tick()
|
||||
```
|
||||
|
||||
## Run the new tests
|
||||
|
||||
```powershell
|
||||
python -m unittest discover Infinity_Vis_1/tests
|
||||
```
|
||||
17
Infinity_Vis_1/infinity_vis_1/__init__.py
Normal file
17
Infinity_Vis_1/infinity_vis_1/__init__.py
Normal file
@@ -0,0 +1,17 @@
|
||||
"""Performance-oriented Infinity Vis restart."""
|
||||
|
||||
from .engine import OutputSettings, RealtimeEngine
|
||||
from .mapping_xml import load_mapping, save_mapping
|
||||
from .models import MappingSpec, PatternParams, SceneState
|
||||
from .renderer import FastRenderer
|
||||
|
||||
__all__ = [
|
||||
"FastRenderer",
|
||||
"MappingSpec",
|
||||
"OutputSettings",
|
||||
"PatternParams",
|
||||
"RealtimeEngine",
|
||||
"SceneState",
|
||||
"load_mapping",
|
||||
"save_mapping",
|
||||
]
|
||||
82
Infinity_Vis_1/infinity_vis_1/benchmark.py
Normal file
82
Infinity_Vis_1/infinity_vis_1/benchmark.py
Normal file
@@ -0,0 +1,82 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
import time
|
||||
|
||||
from .engine import OutputSettings, RealtimeEngine
|
||||
from .mapping_xml import load_mapping
|
||||
from .models import PatternParams, SceneState
|
||||
from .patterns import pattern_ids
|
||||
from .renderer import FastRenderer
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Benchmark the Infinity_Vis_1 render core.")
|
||||
parser.add_argument(
|
||||
"--mapping",
|
||||
type=Path,
|
||||
default=Path(__file__).resolve().parents[2] / "sample_data" / "infinity_mirror_mapping_clean.xml",
|
||||
help="Path to the XML mapping file.",
|
||||
)
|
||||
parser.add_argument("--frames", type=int, default=600, help="Frames per pattern.")
|
||||
parser.add_argument("--pattern", type=str, default="", help="Benchmark a single pattern id.")
|
||||
parser.add_argument(
|
||||
"--mode",
|
||||
type=str,
|
||||
default="render",
|
||||
choices=["render", "engine"],
|
||||
help="Benchmark only the renderer or the new runtime engine loop.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
mapping = load_mapping(args.mapping)
|
||||
renderer = FastRenderer(mapping)
|
||||
stats = renderer.stats()
|
||||
|
||||
patterns = [args.pattern] if args.pattern else pattern_ids()
|
||||
print(f"Mapping: {mapping.name}")
|
||||
print(f"Tiles: {stats.tile_count} | Controllers: {stats.controller_count} | LEDs: {stats.total_leds}")
|
||||
|
||||
for pattern_id in patterns:
|
||||
scene = SceneState(pattern_id=pattern_id, params=PatternParams(), tempo_bpm=120.0)
|
||||
if args.mode == "engine":
|
||||
fps, preview_frames, changed_frames = _benchmark_engine(mapping, scene, max(1, args.frames))
|
||||
print(
|
||||
f"{pattern_id:16s} {fps:9.1f} engine fps "
|
||||
f"| preview {preview_frames:4d} | changed controllers {changed_frames:4d}"
|
||||
)
|
||||
else:
|
||||
fps = _benchmark_renderer(renderer, scene, max(1, args.frames))
|
||||
print(f"{pattern_id:16s} {fps:9.1f} render fps")
|
||||
|
||||
|
||||
def _benchmark_renderer(renderer: FastRenderer, scene: SceneState, frames: int) -> float:
|
||||
start = time.perf_counter()
|
||||
for frame_index in range(frames):
|
||||
renderer.render(scene, timestamp=start + frame_index / 60.0)
|
||||
elapsed = time.perf_counter() - start
|
||||
return frames / elapsed if elapsed > 0 else 0.0
|
||||
|
||||
|
||||
def _benchmark_engine(mapping, scene: SceneState, frames: int) -> tuple[float, int, int]:
|
||||
engine = RealtimeEngine(
|
||||
mapping,
|
||||
scene=scene,
|
||||
settings=OutputSettings(output_fps=60.0, preview_fps=12.0, delta_sending=True, keepalive_seconds=0.35),
|
||||
)
|
||||
start = time.perf_counter()
|
||||
preview_frames = 0
|
||||
changed_frames = 0
|
||||
for frame_index in range(frames):
|
||||
result = engine.tick(now=start + frame_index / 60.0)
|
||||
if result.preview_snapshot is not None:
|
||||
preview_frames += 1
|
||||
changed_frames += len(result.changed_payloads)
|
||||
elapsed = time.perf_counter() - start
|
||||
fps = frames / elapsed if elapsed > 0 else 0.0
|
||||
return fps, preview_frames, changed_frames
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
67
Infinity_Vis_1/infinity_vis_1/colors.py
Normal file
67
Infinity_Vis_1/infinity_vis_1/colors.py
Normal file
@@ -0,0 +1,67 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Iterable
|
||||
|
||||
from .models import RGB
|
||||
|
||||
|
||||
def clamp_byte(value: float) -> int:
|
||||
return max(0, min(255, int(round(value))))
|
||||
|
||||
|
||||
def rgb_from_hex(value: str) -> RGB:
|
||||
raw = str(value or "").strip().lstrip("#")
|
||||
if len(raw) != 6:
|
||||
return (255, 255, 255)
|
||||
try:
|
||||
return (int(raw[0:2], 16), int(raw[2:4], 16), int(raw[4:6], 16))
|
||||
except ValueError:
|
||||
return (255, 255, 255)
|
||||
|
||||
|
||||
def mix(a: RGB, b: RGB, amount: float) -> RGB:
|
||||
amount = max(0.0, min(1.0, float(amount)))
|
||||
return (
|
||||
clamp_byte(a[0] + (b[0] - a[0]) * amount),
|
||||
clamp_byte(a[1] + (b[1] - a[1]) * amount),
|
||||
clamp_byte(a[2] + (b[2] - a[2]) * amount),
|
||||
)
|
||||
|
||||
|
||||
def scale(color: RGB, factor: float) -> RGB:
|
||||
return (
|
||||
clamp_byte(color[0] * factor),
|
||||
clamp_byte(color[1] * factor),
|
||||
clamp_byte(color[2] * factor),
|
||||
)
|
||||
|
||||
|
||||
def average(colors: Iterable[RGB]) -> RGB:
|
||||
items = list(colors)
|
||||
if not items:
|
||||
return (0, 0, 0)
|
||||
return (
|
||||
clamp_byte(sum(color[0] for color in items) / len(items)),
|
||||
clamp_byte(sum(color[1] for color in items) / len(items)),
|
||||
clamp_byte(sum(color[2] for color in items) / len(items)),
|
||||
)
|
||||
|
||||
|
||||
PALETTES: dict[str, tuple[RGB, ...]] = {
|
||||
"Laser Club": ((77, 124, 255), (14, 22, 48), (255, 42, 109), (0, 255, 192)),
|
||||
"Afterhours": ((70, 30, 110), (15, 10, 35), (255, 90, 140), (255, 200, 120)),
|
||||
"Warehouse Heat": ((255, 120, 30), (55, 5, 0), (255, 210, 80), (180, 30, 10)),
|
||||
"Monochrome": ((255, 255, 255), (30, 30, 30)),
|
||||
}
|
||||
|
||||
|
||||
def palette_color(name: str, phase: float) -> RGB:
|
||||
palette = PALETTES.get(name, PALETTES["Laser Club"])
|
||||
if len(palette) == 1:
|
||||
return palette[0]
|
||||
phase = float(phase) % 1.0
|
||||
scaled_phase = phase * len(palette)
|
||||
index = int(scaled_phase) % len(palette)
|
||||
next_index = (index + 1) % len(palette)
|
||||
local = scaled_phase - int(scaled_phase)
|
||||
return mix(palette[index], palette[next_index], local)
|
||||
196
Infinity_Vis_1/infinity_vis_1/engine.py
Normal file
196
Infinity_Vis_1/infinity_vis_1/engine.py
Normal file
@@ -0,0 +1,196 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
import time
|
||||
|
||||
from .models import FrameSnapshot, MappingSpec, SceneState
|
||||
from .output.ddp import DDPClient
|
||||
from .renderer import FastRenderer
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class OutputSettings:
|
||||
output_fps: float = 40.0
|
||||
preview_fps: float = 12.0
|
||||
keepalive_seconds: float = 0.35
|
||||
delta_sending: bool = True
|
||||
preview_enabled: bool = True
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class RuntimeDiagnostics:
|
||||
tile_count: int
|
||||
controller_count: int
|
||||
total_leds: int
|
||||
output_frames: int = 0
|
||||
preview_frames: int = 0
|
||||
packets_built: int = 0
|
||||
packets_sent: int = 0
|
||||
changed_controller_frames: int = 0
|
||||
keepalive_controller_frames: int = 0
|
||||
stale_output_frames: int = 0
|
||||
last_render_ms: float = 0.0
|
||||
last_tick_at: float = 0.0
|
||||
last_output_at: float = 0.0
|
||||
last_preview_at: float = 0.0
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class TickResult:
|
||||
timestamp: float
|
||||
output_due: bool
|
||||
preview_due: bool
|
||||
output_snapshot: FrameSnapshot | None = None
|
||||
preview_snapshot: FrameSnapshot | None = None
|
||||
changed_payloads: dict[str, bytes] = field(default_factory=dict)
|
||||
packet_count: int = 0
|
||||
|
||||
|
||||
class RealtimeEngine:
|
||||
def __init__(
|
||||
self,
|
||||
mapping: MappingSpec,
|
||||
scene: SceneState | None = None,
|
||||
settings: OutputSettings | None = None,
|
||||
ddp_client: DDPClient | None = None,
|
||||
) -> None:
|
||||
self.mapping = mapping
|
||||
self.scene = scene or SceneState()
|
||||
self.settings = settings or OutputSettings()
|
||||
self.renderer = FastRenderer(mapping)
|
||||
self.ddp_client = ddp_client or DDPClient()
|
||||
|
||||
renderer_stats = self.renderer.stats()
|
||||
self.diagnostics = RuntimeDiagnostics(
|
||||
tile_count=renderer_stats.tile_count,
|
||||
controller_count=renderer_stats.controller_count,
|
||||
total_leds=renderer_stats.total_leds,
|
||||
)
|
||||
|
||||
self._last_output_at: float | None = None
|
||||
self._last_preview_at: float | None = None
|
||||
self._last_sent_payloads: dict[str, bytes] = {}
|
||||
self._last_sent_at: dict[str, float] = {}
|
||||
self._latest_snapshot: FrameSnapshot | None = None
|
||||
self._force_output = True
|
||||
self._force_preview = True
|
||||
|
||||
def set_scene(self, scene: SceneState) -> None:
|
||||
self.scene = scene
|
||||
self._force_output = True
|
||||
self._force_preview = True
|
||||
|
||||
def request_preview_refresh(self) -> None:
|
||||
self._force_preview = True
|
||||
|
||||
def latest_snapshot(self) -> FrameSnapshot | None:
|
||||
return self._latest_snapshot
|
||||
|
||||
def tick(self, now: float | None = None) -> TickResult:
|
||||
timestamp = time.perf_counter() if now is None else float(now)
|
||||
output_due = self._is_due(self._last_output_at, self._interval(self.settings.output_fps), timestamp, self._force_output)
|
||||
preview_due = self.settings.preview_enabled and self._is_due(
|
||||
self._last_preview_at,
|
||||
self._interval(self.settings.preview_fps),
|
||||
timestamp,
|
||||
self._force_preview,
|
||||
)
|
||||
if not output_due and not preview_due:
|
||||
return TickResult(timestamp=timestamp, output_due=False, preview_due=False)
|
||||
|
||||
render_started = time.perf_counter()
|
||||
snapshot = self.renderer.render(self.scene, timestamp=timestamp)
|
||||
self.diagnostics.last_render_ms = (time.perf_counter() - render_started) * 1000.0
|
||||
self.diagnostics.last_tick_at = timestamp
|
||||
self._latest_snapshot = snapshot
|
||||
|
||||
changed_payloads: dict[str, bytes] = {}
|
||||
packet_count = 0
|
||||
output_snapshot: FrameSnapshot | None = None
|
||||
preview_snapshot: FrameSnapshot | None = None
|
||||
|
||||
if output_due:
|
||||
changed_payloads, changed_count, keepalive_count = self._select_output_payloads(
|
||||
snapshot.controller_payloads,
|
||||
timestamp,
|
||||
)
|
||||
output_snapshot = snapshot
|
||||
self._last_output_at = timestamp
|
||||
self._force_output = False
|
||||
self.diagnostics.output_frames += 1
|
||||
self.diagnostics.changed_controller_frames += changed_count
|
||||
self.diagnostics.keepalive_controller_frames += keepalive_count
|
||||
self.diagnostics.last_output_at = timestamp
|
||||
if changed_payloads:
|
||||
packet_count = self.ddp_client.count_packets(changed_payloads)
|
||||
self.diagnostics.packets_built += packet_count
|
||||
else:
|
||||
self.diagnostics.stale_output_frames += 1
|
||||
|
||||
if preview_due:
|
||||
preview_snapshot = snapshot
|
||||
self._last_preview_at = timestamp
|
||||
self._force_preview = False
|
||||
self.diagnostics.preview_frames += 1
|
||||
self.diagnostics.last_preview_at = timestamp
|
||||
|
||||
return TickResult(
|
||||
timestamp=timestamp,
|
||||
output_due=output_due,
|
||||
preview_due=preview_due,
|
||||
output_snapshot=output_snapshot,
|
||||
preview_snapshot=preview_snapshot,
|
||||
changed_payloads=changed_payloads,
|
||||
packet_count=packet_count,
|
||||
)
|
||||
|
||||
def send_due(self, now: float | None = None) -> TickResult:
|
||||
result = self.tick(now=now)
|
||||
if result.changed_payloads:
|
||||
sent = self.ddp_client.send(result.changed_payloads)
|
||||
self.diagnostics.packets_sent += sent
|
||||
return result
|
||||
|
||||
def _interval(self, fps: float) -> float:
|
||||
if fps <= 0:
|
||||
return 0.0
|
||||
return 1.0 / float(fps)
|
||||
|
||||
def _is_due(self, last_at: float | None, interval: float, now: float, forced: bool) -> bool:
|
||||
if forced or last_at is None:
|
||||
return True
|
||||
if interval <= 0:
|
||||
return True
|
||||
return (now - last_at) >= max(0.0, interval - 1e-9)
|
||||
|
||||
def _select_output_payloads(self, payloads: dict[str, bytes], timestamp: float) -> tuple[dict[str, bytes], int, int]:
|
||||
if not self.settings.delta_sending:
|
||||
for host, payload in payloads.items():
|
||||
self._last_sent_payloads[host] = payload
|
||||
self._last_sent_at[host] = timestamp
|
||||
return dict(payloads), len(payloads), 0
|
||||
|
||||
changed_payloads: dict[str, bytes] = {}
|
||||
changed_count = 0
|
||||
keepalive_count = 0
|
||||
keepalive_seconds = max(0.0, float(self.settings.keepalive_seconds))
|
||||
|
||||
for host, payload in payloads.items():
|
||||
previous = self._last_sent_payloads.get(host)
|
||||
last_sent_at = self._last_sent_at.get(host)
|
||||
unchanged = previous == payload
|
||||
keepalive_due = (
|
||||
keepalive_seconds > 0.0
|
||||
and unchanged
|
||||
and last_sent_at is not None
|
||||
and (timestamp - last_sent_at) >= keepalive_seconds
|
||||
)
|
||||
if previous is None or not unchanged or keepalive_due:
|
||||
changed_payloads[host] = payload
|
||||
self._last_sent_payloads[host] = payload
|
||||
self._last_sent_at[host] = timestamp
|
||||
if keepalive_due and unchanged:
|
||||
keepalive_count += 1
|
||||
else:
|
||||
changed_count += 1
|
||||
return changed_payloads, changed_count, keepalive_count
|
||||
94
Infinity_Vis_1/infinity_vis_1/live.py
Normal file
94
Infinity_Vis_1/infinity_vis_1/live.py
Normal file
@@ -0,0 +1,94 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
import time
|
||||
|
||||
from .engine import OutputSettings, RealtimeEngine
|
||||
from .mapping_xml import load_mapping
|
||||
from .models import PatternParams, SceneState
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Run the Infinity_Vis_1 live DDP sender.")
|
||||
parser.add_argument(
|
||||
"--mapping",
|
||||
type=Path,
|
||||
default=Path(__file__).resolve().parents[2] / "sample_data" / "infinity_mirror_mapping_clean.xml",
|
||||
help="Path to the XML mapping file.",
|
||||
)
|
||||
parser.add_argument("--pattern", type=str, default="solid", help="Pattern id.")
|
||||
parser.add_argument("--tempo", type=float, default=120.0, help="Tempo in BPM.")
|
||||
parser.add_argument("--fps", type=float, default=40.0, help="Target output FPS.")
|
||||
parser.add_argument("--preview-fps", type=float, default=0.0, help="Preview cadence for snapshot generation.")
|
||||
parser.add_argument("--duration", type=float, default=0.0, help="Optional runtime limit in seconds. 0 means until Ctrl+C.")
|
||||
parser.add_argument("--primary-color", type=str, default="#4D7CFF", help="Primary color in #RRGGBB.")
|
||||
parser.add_argument("--secondary-color", type=str, default="#0E1630", help="Secondary color in #RRGGBB.")
|
||||
parser.add_argument("--palette", type=str, default="Laser Club", help="Palette name.")
|
||||
parser.add_argument("--brightness", type=float, default=1.0, help="Global pattern brightness 0..1.")
|
||||
parser.add_argument("--tempo-multiplier", type=float, default=1.0, help="Pattern tempo multiplier.")
|
||||
parser.add_argument("--color-mode", type=str, default="dual", help="Color mode, for example dual or palette.")
|
||||
parser.add_argument("--strobe-mode", type=str, default="global", help="Strobe mode.")
|
||||
parser.add_argument("--keepalive", type=float, default=0.35, help="Keepalive interval for unchanged payloads.")
|
||||
parser.add_argument("--no-delta", action="store_true", help="Always send every controller every frame.")
|
||||
args = parser.parse_args()
|
||||
|
||||
mapping = load_mapping(args.mapping)
|
||||
scene = SceneState(
|
||||
pattern_id=args.pattern,
|
||||
tempo_bpm=args.tempo,
|
||||
params=PatternParams(
|
||||
brightness=max(0.0, min(1.0, args.brightness)),
|
||||
tempo_multiplier=args.tempo_multiplier,
|
||||
color_mode=args.color_mode,
|
||||
primary_color=args.primary_color,
|
||||
secondary_color=args.secondary_color,
|
||||
palette=args.palette,
|
||||
strobe_mode=args.strobe_mode,
|
||||
),
|
||||
)
|
||||
settings = OutputSettings(
|
||||
output_fps=max(1.0, args.fps),
|
||||
preview_fps=max(0.0, args.preview_fps),
|
||||
keepalive_seconds=max(0.0, args.keepalive),
|
||||
delta_sending=not args.no_delta,
|
||||
preview_enabled=args.preview_fps > 0.0,
|
||||
)
|
||||
engine = RealtimeEngine(mapping, scene=scene, settings=settings)
|
||||
deadline = time.perf_counter() + args.duration if args.duration > 0 else None
|
||||
last_report = time.perf_counter()
|
||||
|
||||
print(f"Infinity_Vis_1 live sender started")
|
||||
print(f"Mapping: {mapping.name}")
|
||||
print(f"Pattern: {scene.pattern_id} | Tempo: {scene.tempo_bpm:.1f} BPM | Output FPS: {settings.output_fps:.1f}")
|
||||
print(f"Controllers: {engine.diagnostics.controller_count} | Tiles: {engine.diagnostics.tile_count} | LEDs: {engine.diagnostics.total_leds}")
|
||||
print("Press Ctrl+C to stop.")
|
||||
|
||||
try:
|
||||
while True:
|
||||
now = time.perf_counter()
|
||||
if deadline is not None and now >= deadline:
|
||||
break
|
||||
engine.send_due(now=now)
|
||||
if now - last_report >= 1.0:
|
||||
diagnostics = engine.diagnostics
|
||||
print(
|
||||
"output={output_frames} preview={preview_frames} packets={packets_sent} "
|
||||
"changed={changed} keepalive={keepalive} stale={stale} render={render_ms:.2f} ms".format(
|
||||
output_frames=diagnostics.output_frames,
|
||||
preview_frames=diagnostics.preview_frames,
|
||||
packets_sent=diagnostics.packets_sent,
|
||||
changed=diagnostics.changed_controller_frames,
|
||||
keepalive=diagnostics.keepalive_controller_frames,
|
||||
stale=diagnostics.stale_output_frames,
|
||||
render_ms=diagnostics.last_render_ms,
|
||||
)
|
||||
)
|
||||
last_report = now
|
||||
time.sleep(0.001)
|
||||
except KeyboardInterrupt:
|
||||
print("Stopped by user.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
263
Infinity_Vis_1/infinity_vis_1/mapping_xml.py
Normal file
263
Infinity_Vis_1/infinity_vis_1/mapping_xml.py
Normal file
@@ -0,0 +1,263 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
import ipaddress
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from .models import MappingSpec, SegmentSpec, TileSpec
|
||||
|
||||
|
||||
class MappingError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
def _req(node: ET.Element, key: str) -> str:
|
||||
value = node.get(key)
|
||||
if value is None:
|
||||
raise MappingError(f"Missing required attribute {key!r} on <{node.tag}>.")
|
||||
return value
|
||||
|
||||
|
||||
def _get_int(node: ET.Element | None, key: str, default: int = 0) -> int:
|
||||
if node is None:
|
||||
return default
|
||||
raw = node.get(key)
|
||||
if raw is None:
|
||||
return default
|
||||
try:
|
||||
return int(raw)
|
||||
except ValueError as exc:
|
||||
raise MappingError(f"Invalid integer {raw!r} for {key!r} on <{node.tag}>.") from exc
|
||||
|
||||
|
||||
def _get_float(node: ET.Element | None, key: str, default: float = 0.0) -> float:
|
||||
if node is None:
|
||||
return default
|
||||
raw = node.get(key)
|
||||
if raw is None:
|
||||
return default
|
||||
try:
|
||||
return float(raw)
|
||||
except ValueError as exc:
|
||||
raise MappingError(f"Invalid float {raw!r} for {key!r} on <{node.tag}>.") from exc
|
||||
|
||||
|
||||
def _get_bool(node: ET.Element | None, key: str, default: bool = False) -> bool:
|
||||
if node is None:
|
||||
return default
|
||||
raw = node.get(key)
|
||||
if raw is None:
|
||||
return default
|
||||
return raw.strip().lower() in {"1", "true", "yes", "on"}
|
||||
|
||||
|
||||
def load_mapping(path: str | Path) -> MappingSpec:
|
||||
xml_text = Path(path).read_text(encoding="utf-8")
|
||||
mapping = load_mapping_from_string(xml_text)
|
||||
mapping.file_path = Path(path)
|
||||
return mapping
|
||||
|
||||
|
||||
def load_mapping_from_string(xml_text: str) -> MappingSpec:
|
||||
try:
|
||||
root = ET.fromstring(xml_text)
|
||||
except ET.ParseError as exc:
|
||||
raise MappingError(f"XML parse error: {exc}") from exc
|
||||
|
||||
if root.tag != "InfinityMirrorConfig":
|
||||
raise MappingError(f"Unexpected root element <{root.tag}>.")
|
||||
|
||||
logical = root.find("LogicalDisplay")
|
||||
defaults = root.find("Defaults")
|
||||
tiles_node = root.find("Tiles")
|
||||
if tiles_node is None:
|
||||
raise MappingError("Missing <Tiles> node.")
|
||||
|
||||
tiles: list[TileSpec] = []
|
||||
for tile_node in tiles_node.findall("Tile"):
|
||||
segments_node = tile_node.find("Segments")
|
||||
segments: list[SegmentSpec] = []
|
||||
for segment_node in segments_node.findall("Segment") if segments_node is not None else []:
|
||||
segments.append(
|
||||
SegmentSpec(
|
||||
name=segment_node.get("name", ""),
|
||||
side=segment_node.get("side", ""),
|
||||
start_channel=_get_int(segment_node, "startChannel", 1),
|
||||
led_count=_get_int(segment_node, "ledCount", 0),
|
||||
reverse=_get_bool(segment_node, "reverse", False),
|
||||
orientation_rad=_get_float(segment_node, "orientationRad", 0.0),
|
||||
x0=_get_float(segment_node, "x0", 0.0),
|
||||
y0=_get_float(segment_node, "y0", 0.0),
|
||||
x1=_get_float(segment_node, "x1", 0.0),
|
||||
y1=_get_float(segment_node, "y1", 0.0),
|
||||
)
|
||||
)
|
||||
|
||||
calibration_node = tile_node.find("Calibration")
|
||||
tiles.append(
|
||||
TileSpec(
|
||||
tile_id=_req(tile_node, "id"),
|
||||
row=_get_int(tile_node, "row", 0),
|
||||
col=_get_int(tile_node, "col", 0),
|
||||
led_total=_get_int(tile_node, "ledTotal", 0),
|
||||
controller_ip=tile_node.get("ip", "").strip(),
|
||||
screen_name=tile_node.get("screenName", ""),
|
||||
controller_host=tile_node.get("controllerHost", ""),
|
||||
controller_name=tile_node.get("controllerName", ""),
|
||||
controller_mac=tile_node.get("controllerMac", ""),
|
||||
universe=_get_int(tile_node, "universe", 0),
|
||||
subnet=_get_int(tile_node, "subnet", 0),
|
||||
enabled=_get_bool(tile_node, "enabled", True),
|
||||
brightness=_get_float(calibration_node, "brightness", 1.0),
|
||||
x0=_get_float(tile_node, "x0", 0.0),
|
||||
y0=_get_float(tile_node, "y0", 0.0),
|
||||
x1=_get_float(tile_node, "x1", 0.0),
|
||||
y1=_get_float(tile_node, "y1", 0.0),
|
||||
segments=sorted(segments, key=lambda item: (item.start_channel, item.name)),
|
||||
)
|
||||
)
|
||||
|
||||
mapping = MappingSpec(
|
||||
name=root.get("name", "Infinity Vis 1"),
|
||||
rows=_get_int(logical, "rows", 3),
|
||||
cols=_get_int(logical, "cols", 6),
|
||||
tiles=tiles,
|
||||
protocol=defaults.findtext("protocol", default="ddp") if defaults is not None else "ddp",
|
||||
subnet=int(defaults.findtext("subnet", default="0")) if defaults is not None else 0,
|
||||
color_format=defaults.findtext("colorFormat", default="rgb") if defaults is not None else "rgb",
|
||||
tile_behavior=defaults.findtext("tileBehavior", default="solid_color_per_tile")
|
||||
if defaults is not None
|
||||
else "solid_color_per_tile",
|
||||
global_gamma=float(defaults.findtext("globalGamma", default="2.2")) if defaults is not None else 2.2,
|
||||
)
|
||||
validate_mapping(mapping)
|
||||
return mapping
|
||||
|
||||
|
||||
def validate_mapping(mapping: MappingSpec) -> None:
|
||||
if mapping.rows <= 0 or mapping.cols <= 0:
|
||||
raise MappingError("Rows and columns must be positive.")
|
||||
|
||||
if len(mapping.tiles) != mapping.rows * mapping.cols:
|
||||
raise MappingError(
|
||||
f"Expected {mapping.rows * mapping.cols} tiles for a {mapping.rows}x{mapping.cols} grid, found {len(mapping.tiles)}."
|
||||
)
|
||||
|
||||
seen_ids: set[str] = set()
|
||||
seen_positions: set[tuple[int, int]] = set()
|
||||
for tile in mapping.tiles:
|
||||
if tile.tile_id in seen_ids:
|
||||
raise MappingError(f"Duplicate tile id {tile.tile_id!r}.")
|
||||
seen_ids.add(tile.tile_id)
|
||||
|
||||
pos = (tile.row, tile.col)
|
||||
if pos in seen_positions:
|
||||
raise MappingError(f"Duplicate tile position row={tile.row}, col={tile.col}.")
|
||||
seen_positions.add(pos)
|
||||
|
||||
if tile.controller_ip:
|
||||
try:
|
||||
ipaddress.ip_address(tile.controller_ip)
|
||||
except ValueError as exc:
|
||||
raise MappingError(f"Invalid controller IP {tile.controller_ip!r} on {tile.tile_id}.") from exc
|
||||
|
||||
segment_leds = sum(max(0, segment.led_count) for segment in tile.segments)
|
||||
if tile.segments and segment_leds != tile.led_total:
|
||||
raise MappingError(
|
||||
f"{tile.tile_id}: ledTotal={tile.led_total} does not match segment sum {segment_leds}."
|
||||
)
|
||||
|
||||
|
||||
def save_mapping(mapping: MappingSpec, path: str | Path) -> None:
|
||||
validate_mapping(mapping)
|
||||
Path(path).write_text(mapping_to_xml_string(mapping), encoding="utf-8")
|
||||
|
||||
|
||||
def mapping_to_xml_string(mapping: MappingSpec) -> str:
|
||||
root = ET.Element("InfinityMirrorConfig", {"name": mapping.name, "version": "1.0"})
|
||||
|
||||
source = ET.SubElement(root, "Source")
|
||||
ET.SubElement(source, "OriginalExport").text = "Infinity_Vis_1"
|
||||
ET.SubElement(source, "DerivedFrom").text = "Infinity_Vis_1"
|
||||
ET.SubElement(source, "OriginalComposition", {"width": "1200", "height": "600"})
|
||||
|
||||
ET.SubElement(
|
||||
root,
|
||||
"LogicalDisplay",
|
||||
{
|
||||
"rows": str(mapping.rows),
|
||||
"cols": str(mapping.cols),
|
||||
"previewWidth": "1200",
|
||||
"previewHeight": "600",
|
||||
"tileWidth": "200",
|
||||
"tileHeight": "200",
|
||||
},
|
||||
)
|
||||
|
||||
defaults = ET.SubElement(root, "Defaults")
|
||||
ET.SubElement(defaults, "protocol").text = mapping.protocol
|
||||
ET.SubElement(defaults, "subnet").text = str(mapping.subnet)
|
||||
ET.SubElement(defaults, "colorFormat").text = mapping.color_format
|
||||
ET.SubElement(defaults, "tileBehavior").text = mapping.tile_behavior
|
||||
ET.SubElement(defaults, "globalGamma").text = _fmt(mapping.global_gamma)
|
||||
|
||||
tiles_node = ET.SubElement(root, "Tiles")
|
||||
for tile in mapping.ordered_tiles():
|
||||
attrs = {
|
||||
"id": tile.tile_id,
|
||||
"row": str(tile.row),
|
||||
"col": str(tile.col),
|
||||
"screenName": tile.screen_name,
|
||||
"ip": tile.controller_ip,
|
||||
"universe": str(tile.universe),
|
||||
"subnet": str(tile.subnet),
|
||||
"ledTotal": str(tile.led_total),
|
||||
"x0": _fmt(tile.x0),
|
||||
"y0": _fmt(tile.y0),
|
||||
"x1": _fmt(tile.x1),
|
||||
"y1": _fmt(tile.y1),
|
||||
"enabled": "true" if tile.enabled else "false",
|
||||
}
|
||||
if tile.controller_host:
|
||||
attrs["controllerHost"] = tile.controller_host
|
||||
if tile.controller_name:
|
||||
attrs["controllerName"] = tile.controller_name
|
||||
if tile.controller_mac:
|
||||
attrs["controllerMac"] = tile.controller_mac
|
||||
tile_node = ET.SubElement(tiles_node, "Tile", attrs)
|
||||
ET.SubElement(
|
||||
tile_node,
|
||||
"Calibration",
|
||||
{
|
||||
"brightness": _fmt(tile.brightness),
|
||||
"redGain": "1",
|
||||
"greenGain": "1",
|
||||
"blueGain": "1",
|
||||
},
|
||||
)
|
||||
segments_node = ET.SubElement(tile_node, "Segments")
|
||||
for segment in tile.segments:
|
||||
ET.SubElement(
|
||||
segments_node,
|
||||
"Segment",
|
||||
{
|
||||
"name": segment.name,
|
||||
"side": segment.side,
|
||||
"startChannel": str(segment.start_channel),
|
||||
"ledCount": str(segment.led_count),
|
||||
"orientationRad": _fmt(segment.orientation_rad),
|
||||
"x0": _fmt(segment.x0),
|
||||
"y0": _fmt(segment.y0),
|
||||
"x1": _fmt(segment.x1),
|
||||
"y1": _fmt(segment.y1),
|
||||
"reverse": "true" if segment.reverse else "false",
|
||||
},
|
||||
)
|
||||
|
||||
ET.indent(root, space=" ")
|
||||
return ET.tostring(root, encoding="unicode", xml_declaration=True)
|
||||
|
||||
|
||||
def _fmt(value: float) -> str:
|
||||
text = f"{float(value):.6f}".rstrip("0").rstrip(".")
|
||||
return text if text else "0"
|
||||
164
Infinity_Vis_1/infinity_vis_1/models.py
Normal file
164
Infinity_Vis_1/infinity_vis_1/models.py
Normal file
@@ -0,0 +1,164 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
|
||||
RGB = tuple[int, int, int]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class SegmentSpec:
|
||||
name: str
|
||||
side: str
|
||||
start_channel: int
|
||||
led_count: int
|
||||
reverse: bool = False
|
||||
orientation_rad: float = 0.0
|
||||
x0: float = 0.0
|
||||
y0: float = 0.0
|
||||
x1: float = 0.0
|
||||
y1: float = 0.0
|
||||
|
||||
@property
|
||||
def byte_offset(self) -> int:
|
||||
return max(0, self.start_channel - 1)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class TileSpec:
|
||||
tile_id: str
|
||||
row: int
|
||||
col: int
|
||||
led_total: int
|
||||
controller_ip: str = ""
|
||||
screen_name: str = ""
|
||||
controller_host: str = ""
|
||||
controller_name: str = ""
|
||||
controller_mac: str = ""
|
||||
universe: int = 0
|
||||
subnet: int = 0
|
||||
enabled: bool = True
|
||||
brightness: float = 1.0
|
||||
x0: float = 0.0
|
||||
y0: float = 0.0
|
||||
x1: float = 0.0
|
||||
y1: float = 0.0
|
||||
segments: list[SegmentSpec] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def controller_payload_bytes(self) -> int:
|
||||
return max(0, self.led_total) * 3
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ControllerTileRoute:
|
||||
tile_id: str
|
||||
controller_offset_bytes: int
|
||||
segment_offsets: tuple[tuple[int, int], ...]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ControllerSpec:
|
||||
host: str
|
||||
payload_bytes: int
|
||||
tiles: tuple[ControllerTileRoute, ...]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MappingSpec:
|
||||
name: str = "Infinity Vis 1"
|
||||
rows: int = 3
|
||||
cols: int = 6
|
||||
tiles: list[TileSpec] = field(default_factory=list)
|
||||
file_path: Path | None = None
|
||||
protocol: str = "ddp"
|
||||
subnet: int = 0
|
||||
color_format: str = "rgb"
|
||||
tile_behavior: str = "solid_color_per_tile"
|
||||
global_gamma: float = 2.2
|
||||
|
||||
def ordered_tiles(self) -> list[TileSpec]:
|
||||
return sorted(self.tiles, key=lambda tile: (tile.row, tile.col, tile.tile_id))
|
||||
|
||||
def tile_lookup(self) -> dict[str, TileSpec]:
|
||||
return {tile.tile_id: tile for tile in self.tiles}
|
||||
|
||||
def build_controllers(self) -> list[ControllerSpec]:
|
||||
groups: dict[str, list[TileSpec]] = {}
|
||||
for tile in self.ordered_tiles():
|
||||
if not tile.enabled:
|
||||
continue
|
||||
host = tile.controller_ip.strip()
|
||||
if not host:
|
||||
continue
|
||||
groups.setdefault(host, []).append(tile)
|
||||
|
||||
controllers: list[ControllerSpec] = []
|
||||
for host, tiles in groups.items():
|
||||
running_offset = 0
|
||||
routes: list[ControllerTileRoute] = []
|
||||
for tile in tiles:
|
||||
segment_offsets = tuple(
|
||||
(running_offset + segment.byte_offset, segment.led_count)
|
||||
for segment in sorted(tile.segments, key=lambda item: (item.start_channel, item.name))
|
||||
)
|
||||
routes.append(
|
||||
ControllerTileRoute(
|
||||
tile_id=tile.tile_id,
|
||||
controller_offset_bytes=running_offset,
|
||||
segment_offsets=segment_offsets,
|
||||
)
|
||||
)
|
||||
running_offset += tile.controller_payload_bytes
|
||||
controllers.append(
|
||||
ControllerSpec(
|
||||
host=host,
|
||||
payload_bytes=running_offset,
|
||||
tiles=tuple(routes),
|
||||
)
|
||||
)
|
||||
return sorted(controllers, key=lambda item: item.host)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class PatternParams:
|
||||
brightness: float = 1.0
|
||||
tempo_multiplier: float = 1.0
|
||||
direction: str = "left_to_right"
|
||||
checker_mode: str = "classic"
|
||||
scan_style: str = "line"
|
||||
angle: float = 0.0
|
||||
on_width: float = 1.0
|
||||
off_width: float = 1.0
|
||||
strobe_mode: str = "global"
|
||||
stopwatch_mode: str = "sync"
|
||||
color_mode: str = "dual"
|
||||
primary_color: str = "#4D7CFF"
|
||||
secondary_color: str = "#0E1630"
|
||||
palette: str = "Laser Club"
|
||||
symmetry: str = "none"
|
||||
center_pulse_mode: str = "expand"
|
||||
step_size: float = 1.0
|
||||
block_size: float = 1.0
|
||||
pixel_group_size: float = 1.0
|
||||
strobe_duty_cycle: float = 0.5
|
||||
randomness: float = 0.35
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class SceneState:
|
||||
pattern_id: str = "solid"
|
||||
params: PatternParams = field(default_factory=PatternParams)
|
||||
tempo_bpm: float = 120.0
|
||||
utility_mode: Literal["none", "blackout", "single_tile", "identify"] = "none"
|
||||
selected_tile_id: str | None = None
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FrameSnapshot:
|
||||
timestamp: float
|
||||
pattern_id: str
|
||||
tile_colors: list[RGB]
|
||||
controller_payloads: dict[str, bytes]
|
||||
1
Infinity_Vis_1/infinity_vis_1/output/__init__.py
Normal file
1
Infinity_Vis_1/infinity_vis_1/output/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Output backends for Infinity_Vis_1."""
|
||||
56
Infinity_Vis_1/infinity_vis_1/output/ddp.py
Normal file
56
Infinity_Vis_1/infinity_vis_1/output/ddp.py
Normal file
@@ -0,0 +1,56 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import socket
|
||||
import struct
|
||||
|
||||
|
||||
DDP_DEFAULT_PORT = 4048
|
||||
DDP_MAX_DATA_LENGTH = 1440
|
||||
DDP_VERSION_1 = 0x40
|
||||
DDP_PUSH_FLAG = 0x01
|
||||
DDP_RGB888 = 0x0B
|
||||
DDP_DESTINATION_ID = 1
|
||||
|
||||
|
||||
class DDPClient:
|
||||
def __init__(self, port: int = DDP_DEFAULT_PORT) -> None:
|
||||
self.port = int(port)
|
||||
self._sequence = 1
|
||||
|
||||
def count_packets(self, host_payloads: dict[str, bytes]) -> int:
|
||||
packet_count = 0
|
||||
for payload in host_payloads.values():
|
||||
chunks, remainder = divmod(len(payload), DDP_MAX_DATA_LENGTH)
|
||||
packet_count += chunks + (1 if remainder else 0)
|
||||
return packet_count
|
||||
|
||||
def build_packets(self, host_payloads: dict[str, bytes]) -> list[tuple[str, bytes]]:
|
||||
packets: list[tuple[str, bytes]] = []
|
||||
sequence = self._next_sequence()
|
||||
for host, payload in host_payloads.items():
|
||||
for offset in range(0, len(payload), DDP_MAX_DATA_LENGTH):
|
||||
chunk = payload[offset : offset + DDP_MAX_DATA_LENGTH]
|
||||
last = offset + DDP_MAX_DATA_LENGTH >= len(payload)
|
||||
header = struct.pack(
|
||||
"!BBBBLH",
|
||||
DDP_VERSION_1 | (DDP_PUSH_FLAG if last else 0),
|
||||
sequence,
|
||||
DDP_RGB888,
|
||||
DDP_DESTINATION_ID,
|
||||
offset,
|
||||
len(chunk),
|
||||
)
|
||||
packets.append((host, header + chunk))
|
||||
return packets
|
||||
|
||||
def send(self, host_payloads: dict[str, bytes]) -> int:
|
||||
packets = self.build_packets(host_payloads)
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
|
||||
for host, packet in packets:
|
||||
sock.sendto(packet, (host, self.port))
|
||||
return len(packets)
|
||||
|
||||
def _next_sequence(self) -> int:
|
||||
sequence = self._sequence
|
||||
self._sequence = 1 if self._sequence >= 15 else self._sequence + 1
|
||||
return sequence
|
||||
257
Infinity_Vis_1/infinity_vis_1/patterns.py
Normal file
257
Infinity_Vis_1/infinity_vis_1/patterns.py
Normal file
@@ -0,0 +1,257 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import random
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .colors import mix, palette_color, rgb_from_hex, scale
|
||||
from .models import MappingSpec, PatternParams, RGB
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class TilePoint:
|
||||
row: int
|
||||
col: int
|
||||
u: float
|
||||
v: float
|
||||
index: int
|
||||
|
||||
@property
|
||||
def tile_key(self) -> str:
|
||||
return f"{self.row}:{self.col}"
|
||||
|
||||
|
||||
def pattern_ids() -> list[str]:
|
||||
return sorted(PATTERN_REGISTRY)
|
||||
|
||||
|
||||
def render_pattern(mapping: MappingSpec, pattern_id: str, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
points = _tile_points(mapping)
|
||||
renderer = PATTERN_REGISTRY.get(pattern_id, _render_solid)
|
||||
return renderer(points, mapping, params, time_s, tempo_bpm)
|
||||
|
||||
|
||||
def utility_pattern(mapping: MappingSpec, utility_mode: str, selected_tile_id: str | None, time_s: float) -> list[RGB]:
|
||||
points = _tile_points(mapping)
|
||||
ordered = mapping.ordered_tiles()
|
||||
if utility_mode == "blackout":
|
||||
return [(0, 0, 0) for _ in points]
|
||||
if utility_mode == "identify":
|
||||
flash = _wave(time_s * 8.0)
|
||||
color = (255, 255, 255) if flash > 0.5 else (0, 0, 0)
|
||||
return [color for _ in points]
|
||||
if utility_mode == "single_tile":
|
||||
return [(255, 255, 255) if ordered[index].tile_id == selected_tile_id else (0, 0, 0) for index, _point in enumerate(points)]
|
||||
return [(0, 0, 0) for _ in points]
|
||||
|
||||
|
||||
def _tile_points(mapping: MappingSpec) -> list[TilePoint]:
|
||||
rows = max(1, mapping.rows - 1)
|
||||
cols = max(1, mapping.cols - 1)
|
||||
points: list[TilePoint] = []
|
||||
for index, tile in enumerate(mapping.ordered_tiles()):
|
||||
points.append(
|
||||
TilePoint(
|
||||
row=tile.row,
|
||||
col=tile.col,
|
||||
u=(tile.col - 1) / cols,
|
||||
v=(tile.row - 1) / rows,
|
||||
index=index,
|
||||
)
|
||||
)
|
||||
return points
|
||||
|
||||
|
||||
def _primary_secondary(params: PatternParams, phase: float = 0.5) -> tuple[RGB, RGB]:
|
||||
if params.color_mode == "palette":
|
||||
return palette_color(params.palette, phase), palette_color(params.palette, (phase + 0.33) % 1.0)
|
||||
return rgb_from_hex(params.primary_color), rgb_from_hex(params.secondary_color)
|
||||
|
||||
|
||||
def _wave(value: float) -> float:
|
||||
return 0.5 + 0.5 * math.sin(value)
|
||||
|
||||
|
||||
def _axis_projection(point: TilePoint, angle_deg: float) -> float:
|
||||
angle = math.radians(angle_deg % 360.0)
|
||||
x = point.u - 0.5
|
||||
y = point.v - 0.5
|
||||
return (x * math.cos(angle)) + (y * math.sin(angle))
|
||||
|
||||
|
||||
def _direction_progress(point: TilePoint, direction: str) -> float:
|
||||
if direction == "right_to_left":
|
||||
return 1.0 - point.u
|
||||
if direction == "bottom_to_top":
|
||||
return 1.0 - point.v
|
||||
if direction == "top_to_bottom":
|
||||
return point.v
|
||||
return point.u
|
||||
|
||||
|
||||
def _render_solid(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
primary, _secondary = _primary_secondary(params, 0.55)
|
||||
return [scale(primary, params.brightness) for _ in points]
|
||||
|
||||
|
||||
def _render_checker(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
return [scale(a if (point.row + point.col) % 2 == 0 else b, params.brightness) for point in points]
|
||||
|
||||
|
||||
def _render_row_gradient(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
return [scale(mix(a, b, point.v), params.brightness) for point in points]
|
||||
|
||||
|
||||
def _render_column_gradient(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
return [scale(mix(a, b, point.u), params.brightness) for point in points]
|
||||
|
||||
|
||||
def _render_breathing(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, _b = _primary_secondary(params, 0.4)
|
||||
pulse = 0.15 + 0.85 * _wave(time_s * (tempo_bpm / 60.0) * params.tempo_multiplier)
|
||||
return [scale(a, params.brightness * pulse) for _ in points]
|
||||
|
||||
|
||||
def _render_center_pulse(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
pulse = _wave(time_s * (tempo_bpm / 60.0) * params.tempo_multiplier)
|
||||
colors: list[RGB] = []
|
||||
for point in points:
|
||||
dx = point.u - 0.5
|
||||
dy = point.v - 0.5
|
||||
dist = math.sqrt(dx * dx + dy * dy) * 1.5
|
||||
strength = max(0.0, 1.0 - abs(dist - pulse))
|
||||
colors.append(scale(mix(b, a, strength), params.brightness))
|
||||
return colors
|
||||
|
||||
|
||||
def _render_sparkle(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
rng = random.Random(int(time_s * 120.0))
|
||||
return [scale(a if rng.random() < (0.15 + params.randomness * 0.35) else b, params.brightness) for _ in points]
|
||||
|
||||
|
||||
def _render_wave_line(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
phase = time_s * (tempo_bpm / 60.0) * params.tempo_multiplier
|
||||
colors: list[RGB] = []
|
||||
for point in points:
|
||||
value = _wave((point.u * math.pi * 4.0) + phase * math.pi * 2.0 + point.v * 1.2)
|
||||
colors.append(scale(mix(b, a, value), params.brightness))
|
||||
return colors
|
||||
|
||||
|
||||
def _render_scan(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
position = ((time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0) * 2.0 - 1.0
|
||||
band = max(0.08, min(1.2, params.on_width * 0.35))
|
||||
colors: list[RGB] = []
|
||||
for point in points:
|
||||
projection = _axis_projection(point, params.angle)
|
||||
active = abs(projection - position) <= band
|
||||
colors.append(scale(a if active else b, params.brightness))
|
||||
return colors
|
||||
|
||||
|
||||
def _render_arrow(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
head = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
|
||||
colors: list[RGB] = []
|
||||
for point in points:
|
||||
progress = _direction_progress(point, params.direction)
|
||||
width = 0.15 + 0.25 * (1.0 - abs(point.v - 0.5) * 2.0)
|
||||
colors.append(scale(a if abs(progress - head) < width else b, params.brightness))
|
||||
return colors
|
||||
|
||||
|
||||
def _render_scan_dual(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
progress = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
|
||||
colors: list[RGB] = []
|
||||
for point in points:
|
||||
axis = _direction_progress(point, params.direction)
|
||||
active = abs(axis - progress) < 0.2 or abs(axis - (1.0 - progress)) < 0.2
|
||||
colors.append(scale(a if active else b, params.brightness))
|
||||
return colors
|
||||
|
||||
|
||||
def _render_sweep(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
progress = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
|
||||
return [scale(a if _direction_progress(point, params.direction) <= progress else b, params.brightness) for point in points]
|
||||
|
||||
|
||||
def _render_saw(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
shift = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
|
||||
return [scale(mix(b, a, (_direction_progress(point, params.direction) + shift) % 1.0), params.brightness) for point in points]
|
||||
|
||||
|
||||
def _render_two_dots(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
p1 = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
|
||||
p2 = (p1 + 0.5) % 1.0
|
||||
colors: list[RGB] = []
|
||||
for point in points:
|
||||
axis = _direction_progress(point, params.direction)
|
||||
active = abs(axis - p1) < 0.12 or abs(axis - p2) < 0.12
|
||||
colors.append(scale(a if active else b, params.brightness))
|
||||
return colors
|
||||
|
||||
|
||||
def _render_strobe(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
phase = (time_s * (tempo_bpm / 60.0) * params.tempo_multiplier) % 1.0
|
||||
on = phase < max(0.02, min(0.98, params.strobe_duty_cycle))
|
||||
if params.strobe_mode == "global":
|
||||
return [scale(a if on else b, params.brightness) for _ in points]
|
||||
|
||||
rng = random.Random(int(time_s * 80.0))
|
||||
result: list[RGB] = []
|
||||
for point in points:
|
||||
local_on = on
|
||||
if params.strobe_mode in {"random_pixels", "random_leds"}:
|
||||
local_on = rng.random() > 0.5
|
||||
elif params.strobe_mode == "checker":
|
||||
local_on = on and ((point.row + point.col) % 2 == 0)
|
||||
result.append(scale(a if local_on else b, params.brightness))
|
||||
return result
|
||||
|
||||
|
||||
def _render_stopwatch(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
highlight = int(time_s) % max(1, len(points))
|
||||
return [scale(a if point.index == highlight else b, params.brightness) for point in points]
|
||||
|
||||
|
||||
def _render_snake(points: list[TilePoint], mapping: MappingSpec, params: PatternParams, time_s: float, tempo_bpm: float) -> list[RGB]:
|
||||
a, b = _primary_secondary(params)
|
||||
order = sorted(points, key=lambda point: (point.row, point.col if point.row % 2 else -point.col))
|
||||
head = int(time_s * (tempo_bpm / 60.0) * params.tempo_multiplier * max(1.0, params.step_size)) % len(order)
|
||||
tail = max(1, int(params.block_size))
|
||||
active = {order[(head - offset) % len(order)].tile_key for offset in range(tail)}
|
||||
return [scale(a if point.tile_key in active else b, params.brightness) for point in points]
|
||||
|
||||
|
||||
PATTERN_REGISTRY = {
|
||||
"arrow": _render_arrow,
|
||||
"breathing": _render_breathing,
|
||||
"center_pulse": _render_center_pulse,
|
||||
"checker": _render_checker,
|
||||
"column_gradient": _render_column_gradient,
|
||||
"row_gradient": _render_row_gradient,
|
||||
"saw": _render_saw,
|
||||
"scan": _render_scan,
|
||||
"scan_dual": _render_scan_dual,
|
||||
"snake": _render_snake,
|
||||
"solid": _render_solid,
|
||||
"sparkle": _render_sparkle,
|
||||
"stopwatch": _render_stopwatch,
|
||||
"strobe": _render_strobe,
|
||||
"sweep": _render_sweep,
|
||||
"two_dots": _render_two_dots,
|
||||
"wave_line": _render_wave_line,
|
||||
}
|
||||
63
Infinity_Vis_1/infinity_vis_1/presets.py
Normal file
63
Infinity_Vis_1/infinity_vis_1/presets.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import asdict, dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import json
|
||||
import re
|
||||
|
||||
from .models import PatternParams, SceneState
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class PresetRecord:
|
||||
name: str
|
||||
pattern_id: str
|
||||
tempo_bpm: float
|
||||
params: dict
|
||||
created_at: str
|
||||
|
||||
@classmethod
|
||||
def from_scene(cls, name: str, scene: SceneState) -> "PresetRecord":
|
||||
return cls(
|
||||
name=name,
|
||||
pattern_id=scene.pattern_id,
|
||||
tempo_bpm=scene.tempo_bpm,
|
||||
params=asdict(scene.params),
|
||||
created_at=datetime.utcnow().isoformat(timespec="seconds"),
|
||||
)
|
||||
|
||||
|
||||
class PresetStore:
|
||||
def __init__(self, root: str | Path) -> None:
|
||||
self.root = Path(root)
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def save(self, record: PresetRecord) -> Path:
|
||||
path = self.root / f"{slugify(record.name)}.json"
|
||||
path.write_text(json.dumps(asdict(record), indent=2), encoding="utf-8")
|
||||
return path
|
||||
|
||||
def list(self) -> list[PresetRecord]:
|
||||
presets: list[PresetRecord] = []
|
||||
for path in sorted(self.root.glob("*.json")):
|
||||
try:
|
||||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||
presets.append(PresetRecord(**payload))
|
||||
except (OSError, json.JSONDecodeError, TypeError):
|
||||
continue
|
||||
return presets
|
||||
|
||||
def load_scene(self, name: str) -> SceneState:
|
||||
path = self.root / f"{slugify(name)}.json"
|
||||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||
record = PresetRecord(**payload)
|
||||
return SceneState(
|
||||
pattern_id=record.pattern_id,
|
||||
tempo_bpm=float(record.tempo_bpm),
|
||||
params=PatternParams(**record.params),
|
||||
)
|
||||
|
||||
|
||||
def slugify(value: str) -> str:
|
||||
return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") or "preset"
|
||||
90
Infinity_Vis_1/infinity_vis_1/renderer.py
Normal file
90
Infinity_Vis_1/infinity_vis_1/renderer.py
Normal file
@@ -0,0 +1,90 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import time
|
||||
|
||||
from .colors import scale
|
||||
from .models import ControllerSpec, FrameSnapshot, MappingSpec, RGB, SceneState
|
||||
from .patterns import render_pattern, utility_pattern
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class RendererStats:
|
||||
tile_count: int
|
||||
controller_count: int
|
||||
total_leds: int
|
||||
|
||||
|
||||
class FastRenderer:
|
||||
def __init__(self, mapping: MappingSpec) -> None:
|
||||
self.mapping = mapping
|
||||
self.tiles = mapping.ordered_tiles()
|
||||
self.controllers: list[ControllerSpec] = mapping.build_controllers()
|
||||
self._tile_index = {tile.tile_id: index for index, tile in enumerate(self.tiles)}
|
||||
self._controller_buffers = {controller.host: bytearray(controller.payload_bytes) for controller in self.controllers}
|
||||
self._controller_zero = {host: bytes(len(buffer)) for host, buffer in self._controller_buffers.items()}
|
||||
self._gamma_table = self._build_gamma_table(mapping.global_gamma)
|
||||
|
||||
def stats(self) -> RendererStats:
|
||||
return RendererStats(
|
||||
tile_count=len(self.tiles),
|
||||
controller_count=len(self.controllers),
|
||||
total_leds=sum(tile.led_total for tile in self.tiles),
|
||||
)
|
||||
|
||||
def render(self, scene: SceneState, timestamp: float | None = None) -> FrameSnapshot:
|
||||
now = time.perf_counter() if timestamp is None else float(timestamp)
|
||||
if scene.utility_mode != "none":
|
||||
tile_colors = utility_pattern(self.mapping, scene.utility_mode, scene.selected_tile_id, now)
|
||||
else:
|
||||
tile_colors = render_pattern(self.mapping, scene.pattern_id, scene.params, now, scene.tempo_bpm)
|
||||
|
||||
controller_payloads = self._build_controller_payloads(tile_colors)
|
||||
return FrameSnapshot(
|
||||
timestamp=now,
|
||||
pattern_id=scene.pattern_id,
|
||||
tile_colors=tile_colors,
|
||||
controller_payloads=controller_payloads,
|
||||
)
|
||||
|
||||
def _build_controller_payloads(self, tile_colors: list[RGB]) -> dict[str, bytes]:
|
||||
for host, buffer in self._controller_buffers.items():
|
||||
buffer[:] = self._controller_zero[host]
|
||||
|
||||
for controller in self.controllers:
|
||||
payload = self._controller_buffers[controller.host]
|
||||
for route in controller.tiles:
|
||||
tile = self.tiles[self._tile_index[route.tile_id]]
|
||||
color = tile_colors[self._tile_index[route.tile_id]]
|
||||
color = self._apply_tile_brightness(color, tile.brightness)
|
||||
for byte_offset, led_count in route.segment_offsets:
|
||||
self._fill_segment(payload, byte_offset, led_count, color)
|
||||
|
||||
return {host: bytes(buffer) for host, buffer in self._controller_buffers.items()}
|
||||
|
||||
def _apply_tile_brightness(self, color: RGB, brightness: float) -> RGB:
|
||||
scaled = scale(color, brightness)
|
||||
return (
|
||||
self._gamma_table[scaled[0]],
|
||||
self._gamma_table[scaled[1]],
|
||||
self._gamma_table[scaled[2]],
|
||||
)
|
||||
|
||||
def _build_gamma_table(self, gamma: float) -> list[int]:
|
||||
gamma = max(0.1, float(gamma))
|
||||
table = [0] * 256
|
||||
for value in range(256):
|
||||
normalized = value / 255.0
|
||||
table[value] = max(0, min(255, int(round((normalized ** gamma) * 255.0))))
|
||||
return table
|
||||
|
||||
def _fill_segment(self, payload: bytearray, byte_offset: int, led_count: int, color: RGB) -> None:
|
||||
red, green, blue = color
|
||||
cursor = byte_offset
|
||||
for _ in range(max(0, led_count)):
|
||||
if cursor + 2 >= len(payload):
|
||||
break
|
||||
payload[cursor] = red
|
||||
payload[cursor + 1] = green
|
||||
payload[cursor + 2] = blue
|
||||
cursor += 3
|
||||
18
Infinity_Vis_1/pyproject.toml
Normal file
18
Infinity_Vis_1/pyproject.toml
Normal file
@@ -0,0 +1,18 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=68", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "infinity-vis-1"
|
||||
version = "0.1.0"
|
||||
description = "Fresh performance-oriented rebuild of Infinity Vis."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = []
|
||||
|
||||
[project.scripts]
|
||||
infinity-vis-1-live = "infinity_vis_1.live:main"
|
||||
infinity-vis-1-benchmark = "infinity_vis_1.benchmark:main"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
include = ["infinity_vis_1*"]
|
||||
100
Infinity_Vis_1/tests/test_engine.py
Normal file
100
Infinity_Vis_1/tests/test_engine.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import unittest
|
||||
|
||||
from Infinity_Vis_1.infinity_vis_1.engine import OutputSettings, RealtimeEngine
|
||||
from Infinity_Vis_1.infinity_vis_1.models import MappingSpec, PatternParams, SceneState, SegmentSpec, TileSpec
|
||||
|
||||
|
||||
class RecordingDDPClient:
|
||||
def __init__(self) -> None:
|
||||
self.sent_payloads: list[dict[str, bytes]] = []
|
||||
|
||||
def count_packets(self, host_payloads: dict[str, bytes]) -> int:
|
||||
return len(host_payloads)
|
||||
|
||||
def send(self, host_payloads: dict[str, bytes]) -> int:
|
||||
self.sent_payloads.append(dict(host_payloads))
|
||||
return self.count_packets(host_payloads)
|
||||
|
||||
|
||||
class EngineTests(unittest.TestCase):
|
||||
def _mapping(self) -> MappingSpec:
|
||||
return MappingSpec(
|
||||
name="engine-test",
|
||||
rows=1,
|
||||
cols=2,
|
||||
tiles=[
|
||||
TileSpec(
|
||||
tile_id="r1c1",
|
||||
row=1,
|
||||
col=1,
|
||||
led_total=2,
|
||||
controller_ip="192.168.0.10",
|
||||
segments=[SegmentSpec(name="all", side="top", start_channel=1, led_count=2)],
|
||||
),
|
||||
TileSpec(
|
||||
tile_id="r1c2",
|
||||
row=1,
|
||||
col=2,
|
||||
led_total=2,
|
||||
controller_ip="192.168.0.10",
|
||||
segments=[SegmentSpec(name="all", side="top", start_channel=1, led_count=2)],
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
def test_preview_rate_is_decoupled_from_output_rate(self) -> None:
|
||||
engine = RealtimeEngine(
|
||||
self._mapping(),
|
||||
scene=SceneState(pattern_id="solid", params=PatternParams(primary_color="#FF0000")),
|
||||
settings=OutputSettings(output_fps=40.0, preview_fps=10.0, delta_sending=False),
|
||||
ddp_client=RecordingDDPClient(),
|
||||
)
|
||||
|
||||
first = engine.tick(now=1.0)
|
||||
second = engine.tick(now=1.03)
|
||||
third = engine.tick(now=1.11)
|
||||
|
||||
self.assertTrue(first.output_due)
|
||||
self.assertTrue(first.preview_due)
|
||||
self.assertTrue(second.output_due)
|
||||
self.assertFalse(second.preview_due)
|
||||
self.assertTrue(third.output_due)
|
||||
self.assertTrue(third.preview_due)
|
||||
self.assertEqual(engine.diagnostics.output_frames, 3)
|
||||
self.assertEqual(engine.diagnostics.preview_frames, 2)
|
||||
|
||||
def test_delta_sending_skips_unchanged_frames_but_keeps_keepalive(self) -> None:
|
||||
engine = RealtimeEngine(
|
||||
self._mapping(),
|
||||
scene=SceneState(pattern_id="solid", params=PatternParams(primary_color="#00FF00")),
|
||||
settings=OutputSettings(output_fps=20.0, preview_fps=0.0, delta_sending=True, keepalive_seconds=0.35),
|
||||
ddp_client=RecordingDDPClient(),
|
||||
)
|
||||
|
||||
first = engine.tick(now=1.00)
|
||||
second = engine.tick(now=1.05)
|
||||
third = engine.tick(now=1.41)
|
||||
|
||||
self.assertEqual(list(first.changed_payloads), ["192.168.0.10"])
|
||||
self.assertEqual(second.changed_payloads, {})
|
||||
self.assertEqual(list(third.changed_payloads), ["192.168.0.10"])
|
||||
self.assertEqual(engine.diagnostics.stale_output_frames, 1)
|
||||
self.assertEqual(engine.diagnostics.keepalive_controller_frames, 1)
|
||||
|
||||
def test_send_due_uses_client_without_network(self) -> None:
|
||||
client = RecordingDDPClient()
|
||||
engine = RealtimeEngine(
|
||||
self._mapping(),
|
||||
scene=SceneState(pattern_id="solid", params=PatternParams(primary_color="#112233")),
|
||||
settings=OutputSettings(output_fps=30.0, preview_fps=0.0, delta_sending=False),
|
||||
ddp_client=client,
|
||||
)
|
||||
|
||||
result = engine.send_due(now=2.0)
|
||||
self.assertEqual(result.packet_count, 1)
|
||||
self.assertEqual(len(client.sent_payloads), 1)
|
||||
self.assertEqual(engine.diagnostics.packets_sent, 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
27
Infinity_Vis_1/tests/test_mapping_xml.py
Normal file
27
Infinity_Vis_1/tests/test_mapping_xml.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from pathlib import Path
|
||||
import unittest
|
||||
|
||||
from Infinity_Vis_1.infinity_vis_1.mapping_xml import load_mapping, load_mapping_from_string, mapping_to_xml_string
|
||||
|
||||
|
||||
class MappingXmlTests(unittest.TestCase):
|
||||
def test_load_sample_mapping(self) -> None:
|
||||
path = Path(__file__).resolve().parents[2] / "sample_data" / "infinity_mirror_mapping_clean.xml"
|
||||
mapping = load_mapping(path)
|
||||
self.assertEqual(mapping.rows, 3)
|
||||
self.assertEqual(mapping.cols, 6)
|
||||
self.assertEqual(len(mapping.tiles), 18)
|
||||
self.assertEqual(mapping.tiles[0].led_total, 106)
|
||||
|
||||
def test_round_trip_preserves_tile_count(self) -> None:
|
||||
path = Path(__file__).resolve().parents[2] / "sample_data" / "infinity_mirror_mapping_clean.xml"
|
||||
mapping = load_mapping(path)
|
||||
xml_text = mapping_to_xml_string(mapping)
|
||||
restored = load_mapping_from_string(xml_text)
|
||||
self.assertEqual(len(restored.tiles), len(mapping.tiles))
|
||||
self.assertEqual(restored.rows, mapping.rows)
|
||||
self.assertEqual(restored.cols, mapping.cols)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
48
Infinity_Vis_1/tests/test_renderer.py
Normal file
48
Infinity_Vis_1/tests/test_renderer.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import unittest
|
||||
|
||||
from Infinity_Vis_1.infinity_vis_1.models import MappingSpec, PatternParams, SceneState, SegmentSpec, TileSpec
|
||||
from Infinity_Vis_1.infinity_vis_1.renderer import FastRenderer
|
||||
|
||||
|
||||
class RendererTests(unittest.TestCase):
|
||||
def _mapping(self) -> MappingSpec:
|
||||
return MappingSpec(
|
||||
name="test",
|
||||
rows=1,
|
||||
cols=2,
|
||||
tiles=[
|
||||
TileSpec(
|
||||
tile_id="r1c1",
|
||||
row=1,
|
||||
col=1,
|
||||
led_total=2,
|
||||
controller_ip="192.168.0.10",
|
||||
segments=[SegmentSpec(name="all", side="top", start_channel=1, led_count=2)],
|
||||
),
|
||||
TileSpec(
|
||||
tile_id="r1c2",
|
||||
row=1,
|
||||
col=2,
|
||||
led_total=2,
|
||||
controller_ip="192.168.0.10",
|
||||
segments=[SegmentSpec(name="all", side="top", start_channel=1, led_count=2)],
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
def test_shared_ip_tiles_produce_one_controller_payload(self) -> None:
|
||||
renderer = FastRenderer(self._mapping())
|
||||
scene = SceneState(pattern_id="solid", params=PatternParams(primary_color="#FF0000"))
|
||||
frame = renderer.render(scene, timestamp=1.0)
|
||||
self.assertEqual(list(frame.controller_payloads), ["192.168.0.10"])
|
||||
self.assertEqual(len(frame.controller_payloads["192.168.0.10"]), 12)
|
||||
|
||||
def test_blackout_utility_zeroes_payload(self) -> None:
|
||||
renderer = FastRenderer(self._mapping())
|
||||
scene = SceneState(pattern_id="solid", utility_mode="blackout")
|
||||
frame = renderer.render(scene, timestamp=1.0)
|
||||
self.assertEqual(frame.controller_payloads["192.168.0.10"], bytes(12))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user