#!/usr/bin/env python3 """Local Infinity visualizer for the exact Global-2D layer. The browser does not reimplement WLED effects. This server proxies the master state and renders only the Infinity Global-2D layer for the 6 x 3 x 106 layout. """ from __future__ import annotations import argparse import errno import json import math import socket import sys import time import urllib.error import urllib.parse import urllib.request from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Any NODE_COUNT = 6 ROWS = 3 LEDS_PER_PANEL = 106 OUTPUT_LABELS = ["UART6", "UART5", "UART4"] MODE_NAMES = ["Off", "Center Pulse", "Checkerd", "Arrow", "Scan", "Snake", "Wave Line"] VARIANT_NAMES = ["Expand / Classic / Line", "Reverse / Diagonal / Bands", "Outline / Checkerd", "Outline Reverse"] BLEND_NAMES = ["Replace", "Add", "Multiply Mask", "Palette Tint"] DIRECTION_NAMES = ["Left -> Right", "Right -> Left", "Top -> Bottom", "Bottom -> Top", "Outward", "Inward", "Ping Pong"] BPM_MIN = 20 BPM_MAX = 240 HTML = r""" Infinity Local Visualizer

Infinity Local Visualizer

Exact Global-2D layer preview. WLED base effects are not simulated.
Starting...
""" def clamp_byte(value: float) -> int: return max(0, min(255, int(round(value)))) def smoothstep(edge0: float, edge1: float, x: float) -> float: if edge0 == edge1: return 0.0 if x < edge0 else 1.0 x = max(0.0, min(1.0, (x - edge0) / (edge1 - edge0))) return x * x * (3.0 - 2.0 * x) def speed_to_bpm(speed: int) -> int: speed = max(0, min(255, int(speed))) return round(BPM_MIN + speed * (BPM_MAX - BPM_MIN) / 255.0) def spatial_phase(now_us: int, speed: int) -> float: seconds = (now_us % 60_000_000) / 1_000_000.0 cps = speed_to_bpm(speed) / 60.0 return seconds * cps def panel_led_position(led: int) -> tuple[float, float, int]: if led < 25: return (led + 0.5) / 25.0, 0.0, 0 if led < 52: return 1.0, (led - 25 + 0.5) / 27.0, 1 if led < 79: return 1.0 - ((led - 52 + 0.5) / 27.0), 1.0, 2 return 0.0, 1.0 - ((led - 79 + 0.5) / 27.0), 3 def center_pulse_amount(col: int, row: int, led: int, now_us: int, speed: int, variant: int) -> int: distance = abs(row - 1.0) + abs(col - 2.5) max_distance = 3.5 span = max_distance + 1.0 front = (spatial_phase(now_us, speed) * span) % span if variant in (1, 3): front = max_distance - front amount = 1.0 - smoothstep(0.0, 0.70, abs(distance - front)) value = clamp_byte(amount * 255.0) if variant in (2, 3): _, _, side = panel_led_position(led) if row == 1 and col in (2, 3): return value if side in (0, 2) else 0 if col == 0: return value if side == 3 else 0 if col == 5: return value if side == 1 else 0 if row == 0: return value if side == 0 else 0 if row == 2: return value if side == 2 else 0 return value def checker_amount(col: int, row: int, led: int, now_us: int, speed: int, variant: int) -> int: parity = (row + col) & 1 step = int(math.floor(spatial_phase(now_us, speed))) if variant in (1, 2): x, y, _ = panel_led_position(led) slash = variant == 2 and (step & 1) first = y <= (1.0 - x if slash else x) return 255 if ((parity == 0) == first) else 0 return 255 if ((parity + step) & 1) == 0 else 0 def wave_line_amount(col: int, row: int, now_us: int, speed: int, direction: int) -> int: triangle = [0, 1, 2, 1] step = int(math.floor(spatial_phase(now_us, speed))) if direction in (2, 3): phase = step if direction == 2 else -step target = round(triangle[(row - phase) % 4] * ((NODE_COUNT - 1) / 2.0) / 2.0) return 255 if col == min(target, NODE_COUNT - 1) else 0 phase = -step if direction == 1 else step target = round(triangle[(col - phase) % 4] * ((ROWS - 1) / 2.0)) return 255 if row == min(target, ROWS - 1) else 0 def arrow_amount(col: int, row: int, now_us: int, speed: int, direction: int, size: int) -> int: horizontal = direction not in (2, 3) major_count = NODE_COUNT if horizontal else ROWS minor_count = ROWS if horizontal else NODE_COUNT major = col if horizontal else row minor = row if horizontal else col gap = max(1, 1 + size // 86) - 1 span = 3 + gap movement = int(math.floor(spatial_phase(now_us, speed))) band = 0 if abs(minor - ((minor_count - 1) / 2.0)) <= 0.55 else 1 orientation_right = direction in (0, 2, 4) target = 1 if band == 0 else (0 if orientation_right else 2) local = major - movement if orientation_right else major + movement return 255 if major_count > 0 and (local % span) == target else 0 def scan_amount(col: int, row: int, led: int, now_us: int, speed: int, size: int, angle: int, option: int, direction: int) -> int: x, y, _ = panel_led_position(led) vertical = direction in (2, 3) radians = math.radians((angle + (90 if vertical else 0)) % 360) vx, vy = math.cos(radians), math.sin(radians) progress = (col + x) * vx + (row + y) * vy min_progress, max_progress = -3.0, 8.0 width = 0.15 + (size / 255.0) * (1.60 if option == 1 else 0.85) travel = (max_progress - min_progress) + width phase = (spatial_phase(now_us, speed) * travel) % travel if direction == 6: phase = (spatial_phase(now_us, speed) * travel) % (travel * 2.0) if phase > travel: phase = (travel * 2.0) - phase elif direction in (1, 3): phase = travel - phase center = min_progress + phase if option == 1: period = width * 2.0 + 0.35 d = abs(((progress - center + period * 64.0) % period) - period * 0.5) return clamp_byte((1.0 - smoothstep(width * 0.45, width * 0.75, d)) * 255.0) return clamp_byte((1.0 - smoothstep(width * 0.5, width * 0.5 + 0.55, abs(progress - center))) * 255.0) def snake_amount(col: int, row: int, now_us: int, speed: int, size: int, seed: int) -> int: path_len = NODE_COUNT * ROWS panel_index = row * NODE_COUNT + (NODE_COUNT - 1 - col if row & 1 else col) step = int(math.floor(spatial_phase(now_us, speed))) head = (step + seed % path_len) % path_len length = 3 + max(1, size // 64) for offset in range(length): if panel_index == (head + path_len - (offset % path_len)) % path_len: return max(55, 255 - offset * 38) apple = (seed * 17 + (step // path_len) * 11 + 7) % path_len return 180 if panel_index == apple else 0 def blend(primary: list[int], secondary: list[int], amount: int) -> list[int]: return [clamp_byte(secondary[i] + (primary[i] - secondary[i]) * amount / 255.0) for i in range(3)] def layer_amount(mode: int, col: int, row: int, led: int, now_us: int, spatial: dict[str, Any], scene: dict[str, Any]) -> int: speed = int(scene.get("speed", 128)) variant = int(spatial.get("variant", 0)) direction = int(spatial.get("direction", 0)) size = int(spatial.get("size", 64)) if mode == 1: return center_pulse_amount(col, row, led, now_us, speed, variant) if mode == 2: return checker_amount(col, row, led, now_us, speed, variant) if mode == 3: return arrow_amount(col, row, now_us, speed, direction, size) if mode == 4: return scan_amount(col, row, led, now_us, speed, size, int(spatial.get("angle", 0)), int(spatial.get("option", 0)), direction) if mode == 5: return snake_amount(col, row, now_us, speed, size, int(scene.get("seed", 1))) if mode == 6: return wave_line_amount(col, row, now_us, speed, direction) return 0 def render_frame(state: dict[str, Any]) -> dict[str, Any]: scene = state.get("scene", {}) spatial = scene.get("spatial", {}) or {} mode = int(spatial.get("mode", 0)) strength = int(spatial.get("strength", 180)) primary = scene.get("primary", [255, 160, 80])[:3] secondary = scene.get("secondary", [0, 32, 255])[:3] now_us = int(time.monotonic() * 1_000_000) + int(scene.get("phase", 0)) * 1000 panels: list[list[list[list[int]]]] = [] for row in range(ROWS): row_panels = [] for col in range(NODE_COUNT): leds = [] for led in range(LEDS_PER_PANEL): amount = layer_amount(mode, col, row, led, now_us, spatial, scene) amount = clamp_byte(amount * strength / 255.0) leds.append(blend(primary, secondary, amount) if amount else [0, 0, 0]) row_panels.append(leds) panels.append(row_panels) return { "scene": scene, "node_ips": state.get("node_ips", []), "panels": panels, "mode_name": MODE_NAMES[mode] if 0 <= mode < len(MODE_NAMES) else "Unknown", "variant_name": VARIANT_NAMES[int(spatial.get("variant", 0))] if int(spatial.get("variant", 0)) < len(VARIANT_NAMES) else "Unknown", "blend_name": BLEND_NAMES[int(spatial.get("blend", 2))] if int(spatial.get("blend", 2)) < len(BLEND_NAMES) else "Unknown", "direction_name": DIRECTION_NAMES[int(spatial.get("direction", 0))] if int(spatial.get("direction", 0)) < len(DIRECTION_NAMES) else "Unknown", "note": "2D layer exact; WLED base not simulated", } class VisualizerServer(ThreadingHTTPServer): allow_reuse_address = True master: str timeout_s: float class Handler(BaseHTTPRequestHandler): server: VisualizerServer def log_message(self, fmt: str, *args: Any) -> None: message = fmt % args if '"GET /api/' in message and (' 502 ' in message or ' 504 ' in message): return sys.stderr.write("%s - %s\n" % (self.log_date_time_string(), message)) def send_bytes(self, status: int, content_type: str, body: bytes) -> None: try: self.send_response(status) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(body) except (BrokenPipeError, ConnectionResetError): pass def do_GET(self) -> None: if self.path == "/" or self.path.startswith("/?"): self.send_bytes(200, "text/html; charset=utf-8", HTML.encode("utf-8")) return if self.path.startswith("/api/infinity"): self.proxy_infinity(self.master_from_query()) return if self.path.startswith("/api/frame"): self.proxy_frame(self.master_from_query()) return if self.path == "/health": self.send_bytes(200, "application/json", b'{"ok":true}') return self.send_bytes(404, "text/plain; charset=utf-8", b"not found") def master_from_query(self) -> str: values = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query) return values.get("master", [self.server.master])[0].strip() or self.server.master def fetch_master_state(self, master: str) -> dict[str, Any]: url = f"http://{master}/json/infinity" with urllib.request.urlopen(url, timeout=self.server.timeout_s) as response: body = response.read() return json.loads(body.decode("utf-8")) def proxy_infinity(self, master: str) -> None: try: state = self.fetch_master_state(master) except (urllib.error.URLError, socket.timeout, TimeoutError) as exc: self.send_bytes(504, "application/json", json.dumps({"error":"master unreachable","detail":str(exc)}).encode("utf-8")) return except (UnicodeDecodeError, json.JSONDecodeError) as exc: self.send_bytes(502, "application/json", json.dumps({"error":f"invalid master JSON: {exc}"}).encode("utf-8")) return self.send_bytes(200, "application/json", json.dumps(state).encode("utf-8")) def proxy_frame(self, master: str) -> None: try: state = self.fetch_master_state(master) frame = render_frame(state) except (urllib.error.URLError, socket.timeout, TimeoutError) as exc: self.send_bytes(504, "application/json", json.dumps({"error":"master unreachable","detail":str(exc)}).encode("utf-8")) return except Exception as exc: # keep the operator UI alive and explicit self.send_bytes(502, "application/json", json.dumps({"error":f"frame render failed: {exc}"}).encode("utf-8")) return self.send_bytes(200, "application/json", json.dumps(frame, separators=(",", ":")).encode("utf-8")) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Serve a local Infinity Global-2D visualizer.") parser.add_argument("--master", default="10.42.0.213", help="Infinity master IP or hostname") parser.add_argument("--bind", default="127.0.0.1", help="Local bind address") parser.add_argument("--port", type=int, default=8765, help="Local HTTP port") parser.add_argument("--no-port-fallback", action="store_true", help="Fail instead of trying the next ports when busy") parser.add_argument("--timeout", type=float, default=1.2, help="Master request timeout in seconds") return parser.parse_args() def bind_server(args: argparse.Namespace) -> VisualizerServer: last_error: OSError | None = None ports = [args.port] if args.no_port_fallback else range(args.port, args.port + 50) for port in ports: try: server = VisualizerServer((args.bind, port), Handler) if port != args.port: print(f"Port {args.port} is busy, using {port} instead.") return server except OSError as exc: last_error = exc if exc.errno != errno.EADDRINUSE or args.no_port_fallback: break if last_error and last_error.errno == errno.EADDRINUSE: raise SystemExit(f"Could not start visualizer: ports {args.port}-{args.port + 49} are busy.") raise last_error or RuntimeError("Could not bind visualizer server") def main() -> int: args = parse_args() server = bind_server(args) server.master = args.master server.timeout_s = args.timeout host, port = server.server_address[:2] print(f"Infinity visualizer: http://{host}:{port}/?master={args.master}") print(f"Proxying master: http://{args.master}/json/infinity") print("Rendering exact Infinity Global-2D layer; WLED base effects are not simulated.") try: server.serve_forever() except KeyboardInterrupt: print("\nStopped.") finally: server.server_close() return 0 if __name__ == "__main__": raise SystemExit(main())