#!/usr/bin/env python3
"""Local Infinity visualizer for the Infinity Global-2D layer.
The browser proxies the master state and renders the Infinity Global-2D layer.
Effect-mask modes use a synthetic color preview so geometry, masks and timing
can be checked without reimplementing the full WLED effect engine.
"""
from __future__ import annotations
import argparse
import errno
import json
import math
import socket
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
NODE_COUNT = 6
ROWS = 3
LEDS_PER_PANEL = 106
OUTPUT_LABELS = ["UART6", "UART5", "UART4"]
MODE_NAMES = ["Off", "Center Pulse", "Checkerd", "Arrow", "Scan", "Snake", "Wave Line", "Strobe", "Schlängeln", "Sunburst"]
VARIANT_NAMES = ["Expand / Classic / Line", "Reverse / Diagonal / Bands", "Outline / Checkerd", "Outline Reverse"]
SCHLAENGELN_VARIANT_NAMES = ["Top Left", "Top Right", "Bottom Left", "Bottom Right"]
SUNBURST_VARIANT_NAMES = ["Still", "Wobble", "Rotate"]
DIRECTION_NAMES = ["Left -> Right", "Right -> Left", "Top -> Bottom", "Bottom -> Top", "Outward", "Inward", "Ping Pong"]
BPM_MIN = 20
BPM_MAX = 240
PANEL_GAP_RATIO = 0.50 # 8 cm gap for roughly 16 cm active panel aperture.
# Keep in sync with wled00/infinity_sync.cpp. Values are:
# rotation quarter-turns clockwise, mirror X, mirror Y.
PANEL_TRANSFORMS: list[list[tuple[int, bool, bool]]] = [
[(0, False, False), (0, False, False), (0, False, False), (0, False, False), (0, False, False), (0, False, False)],
[(0, False, False), (0, False, False), (0, False, False), (0, False, False), (0, False, False), (0, False, False)],
[(0, False, False), (0, False, False), (0, False, False), (0, False, False), (0, False, False), (0, False, False)],
]
HTML = r"""
Infinity Local Visualizer
Starting...
"""
def clamp_byte(value: float) -> int:
return max(0, min(255, int(round(value))))
def smoothstep(edge0: float, edge1: float, x: float) -> float:
if edge0 == edge1:
return 0.0 if x < edge0 else 1.0
x = max(0.0, min(1.0, (x - edge0) / (edge1 - edge0)))
return x * x * (3.0 - 2.0 * x)
def speed_to_bpm(speed: int) -> int:
speed = max(0, min(255, int(speed)))
return round(BPM_MIN + speed * (BPM_MAX - BPM_MIN) / 255.0)
def spatial_beat_position(now_us: int, speed: int) -> float:
seconds = (now_us % 60_000_000) / 1_000_000.0
cps = speed_to_bpm(speed) / 60.0
return seconds * cps
def spatial_step_position(now_us: int, speed: int) -> float:
# Two visible animation phases make one measured on/off panel cycle match the BPM value.
return spatial_beat_position(now_us, speed) * 2.0
def spatial_beat_index(now_us: int, speed: int) -> int:
return int(math.floor(spatial_beat_position(now_us, speed)))
def spatial_step_index(now_us: int, speed: int) -> int:
return int(math.floor(spatial_step_position(now_us, speed)))
def spatial_beat_frac(now_us: int, speed: int) -> float:
beat = spatial_beat_position(now_us, speed)
return beat - math.floor(beat)
def strobe_amount(now_us: int, speed: int, pulse_width: int) -> int:
pulse_width = max(0, min(255, int(pulse_width)))
duty = 0.01 + (pulse_width / 255.0) * 0.34
phase = spatial_beat_position(now_us, speed) * 8.0
return 255 if (phase - math.floor(phase)) < duty else 0
def serpentine_panel_index(col: int, row: int) -> int:
return row * NODE_COUNT + (NODE_COUNT - 1 - col if row & 1 else col)
def mirrored_serpentine_panel_index(col: int, row: int, variant: int) -> int:
if variant in (1, 3):
col = NODE_COUNT - 1 - col
if variant in (2, 3):
row = ROWS - 1 - row
return serpentine_panel_index(col, row)
def chain_length_from_size(size: int) -> int:
size = max(0, min(255, int(size)))
if size <= 64:
return max(1, size // 16)
return min(18, 4 + ((size - 64) * 14 + 95) // 191)
def snake_length_from_size(size: int) -> int:
return min(17, 3 + max(1, max(0, min(255, int(size))) // 64))
def spatial_hash(value: int) -> int:
value &= 0xFFFFFFFF
value ^= value >> 16
value = (value * 0x7FEB352D) & 0xFFFFFFFF
value ^= value >> 15
value = (value * 0x846CA68B) & 0xFFFFFFFF
value ^= value >> 16
return value & 0xFFFFFFFF
def grid_panel_index(col: int, row: int) -> int:
return row * NODE_COUNT + col
def grid_col(index: int) -> int:
return index % NODE_COUNT
def grid_row(index: int) -> int:
return index // NODE_COUNT
def snake_body_contains(body: list[int], position: int, skip_tail: int = 0) -> bool:
end = max(0, len(body) - skip_tail)
return position in body[:end]
def snake_spawn_apple(body: list[int], seed: int, generation: int) -> int:
candidate = spatial_hash(seed ^ (generation * 0x9E3779B9)) % (NODE_COUNT * ROWS)
for _ in range(NODE_COUNT * ROWS):
if not snake_body_contains(body, candidate):
return candidate
candidate = (candidate + 7) % (NODE_COUNT * ROWS)
return body[0]
def snake_neighbors(position: int) -> list[tuple[int, int]]:
col, row = grid_col(position), grid_row(position)
out: list[tuple[int, int]] = []
if col + 1 < NODE_COUNT:
out.append((0, grid_panel_index(col + 1, row)))
if row + 1 < ROWS:
out.append((2, grid_panel_index(col, row + 1)))
if col > 0:
out.append((1, grid_panel_index(col - 1, row)))
if row > 0:
out.append((3, grid_panel_index(col, row - 1)))
return out
def snake_distance(a: int, b: int) -> int:
return abs(grid_col(a) - grid_col(b)) + abs(grid_row(a) - grid_row(b))
def snake_reset(seed: int, epoch: int, target_length: int, generation: int) -> tuple[list[int], int, int]:
head = spatial_hash(seed ^ (epoch * 0x45D9F3B)) % (NODE_COUNT * ROWS)
body = [head] * min(target_length, 3)
generation += 1
apple = snake_spawn_apple(body, seed ^ epoch, generation)
return body, apple, generation
def snake_state_at(local_step: int, target_length: int, seed: int) -> tuple[list[int], int]:
body, apple, generation = snake_reset(seed, 0, target_length, 0)
base_order = [0, 2, 1, 3]
for step in range(local_step):
order_offset = spatial_hash(seed ^ (step * 0x9E3779B1) ^ generation) & 3
ordered = base_order[order_offset:] + base_order[:order_offset]
by_dir = dict(snake_neighbors(body[0]))
best = None
best_distance = 999
for direction in ordered:
if direction not in by_dir:
continue
nxt = by_dir[direction]
will_eat = nxt == apple
if snake_body_contains(body, nxt, 0 if will_eat else 1):
continue
distance = snake_distance(nxt, apple)
if distance < best_distance:
best_distance = distance
best = nxt
if best is None:
body, apple, generation = snake_reset(seed, step + 1, target_length, generation)
continue
ate = best == apple
new_len = min(target_length, len(body) + 1) if ate else len(body)
body = [best] + body[:new_len - 1]
if ate:
generation += 1
apple = snake_spawn_apple(body, seed ^ step, generation)
return body, apple
def triangle_step(step: int, max_index: int) -> int:
if max_index <= 0:
return 0
period = max_index * 2
phase = step % period
return period - phase if phase > max_index else phase
def schlaengeln_pingpong_position(step: int, offset: int, max_index: int) -> int:
if max_index <= 0:
return 0
period = max_index * 2
phase = (step + period - (offset % period)) % period
return period - phase if phase > max_index else phase
def panel_led_position(led: int) -> tuple[float, float, int]:
if led < 25:
return (led + 0.5) / 25.0, 0.0, 0
if led < 52:
return 1.0, (led - 25 + 0.5) / 27.0, 1
if led < 79:
return 1.0 - ((led - 52 + 0.5) / 27.0), 1.0, 2
return 0.0, 1.0 - ((led - 79 + 0.5) / 27.0), 3
def apply_panel_transform(col: int, row: int, x: float, y: float) -> tuple[float, float]:
try:
rotation, mirror_x, mirror_y = PANEL_TRANSFORMS[row][col]
except IndexError:
return x, y
if mirror_x:
x = 1.0 - x
if mirror_y:
y = 1.0 - y
for _ in range(rotation & 3):
x, y = 1.0 - y, x
return x, y
def physical_panel_led_position(col: int, row: int, led: int) -> tuple[float, float]:
lx, ly, _ = panel_led_position(led)
lx, ly = apply_panel_transform(col, row, lx, ly)
pitch = 1.0 + PANEL_GAP_RATIO
return col * pitch + lx, row * pitch + ly
def apply_sunburst_panel_transform(x: float, y: float) -> tuple[float, float]:
# Match firmware: Sunburst gets the observed 90-degree-left correction,
# while Scan and all other modes keep the neutral global transform.
for _ in range(3):
x, y = 1.0 - y, x
return x, y
def sunburst_panel_led_position(col: int, row: int, led: int) -> tuple[float, float]:
lx, ly, _ = panel_led_position(led)
lx, ly = apply_sunburst_panel_transform(lx, ly)
pitch = 1.0 + PANEL_GAP_RATIO
return col * pitch + lx, row * pitch + ly
def physical_panel_center() -> tuple[float, float]:
pitch = 1.0 + PANEL_GAP_RATIO
return ((NODE_COUNT - 1) * pitch + 1.0) * 0.5, ((ROWS - 1) * pitch + 1.0) * 0.5
def hsv_to_rgb(hue: float, sat: float = 1.0, val: float = 1.0) -> list[int]:
hue = hue % 1.0
sector = hue * 6.0
i = int(math.floor(sector))
f = sector - i
p = val * (1.0 - sat)
q = val * (1.0 - sat * f)
t = val * (1.0 - sat * (1.0 - f))
if i == 0:
r, g, b = val, t, p
elif i == 1:
r, g, b = q, val, p
elif i == 2:
r, g, b = p, val, t
elif i == 3:
r, g, b = p, q, val
elif i == 4:
r, g, b = t, p, val
else:
r, g, b = val, p, q
return [clamp_byte(r * 255), clamp_byte(g * 255), clamp_byte(b * 255)]
def center_pulse_amount(col: int, row: int, led: int, now_us: int, speed: int, variant: int) -> int:
distance = abs(row - 1.0) + abs(col - 2.5)
max_distance = 3.5
span = max_distance + 1.0
front = spatial_step_position(now_us, speed) % span
if variant in (1, 3):
front = max_distance - front
amount = 1.0 - smoothstep(0.0, 0.70, abs(distance - front))
value = clamp_byte(amount * 255.0)
if variant in (2, 3):
_, _, side = panel_led_position(led)
if row == 1 and col in (2, 3):
return value if side in (0, 2) else 0
if col == 0:
return value if side == 3 else 0
if col == 5:
return value if side == 1 else 0
if row == 0:
return value if side == 0 else 0
if row == 2:
return value if side == 2 else 0
return value
def checker_amount(col: int, row: int, led: int, now_us: int, speed: int, variant: int) -> int:
parity = (row + col) & 1
step = spatial_step_index(now_us, speed)
if variant in (1, 2):
x, y, _ = panel_led_position(led)
slash = variant == 2 and (step & 1)
first = y <= (1.0 - x if slash else x)
return 255 if ((((parity + step) & 1) == 0) == first) else 0
return 255 if ((parity + step) & 1) == 0 else 0
def wave_line_amount(col: int, row: int, now_us: int, speed: int, direction: int) -> int:
triangle = [0, 1, 2, 1]
step = spatial_step_index(now_us, speed)
if direction in (2, 3):
phase = step if direction == 2 else -step
target = round(triangle[(row - phase) % 4] * ((NODE_COUNT - 1) / 2.0) / 2.0)
return 255 if col == min(target, NODE_COUNT - 1) else 0
phase = -step if direction == 1 else step
target = round(triangle[(col - phase) % 4] * ((ROWS - 1) / 2.0))
return 255 if row == min(target, ROWS - 1) else 0
def arrow_amount(col: int, row: int, now_us: int, speed: int, direction: int, size: int) -> int:
horizontal = direction not in (2, 3)
major_count = NODE_COUNT if horizontal else ROWS
minor_count = ROWS if horizontal else NODE_COUNT
major = col if horizontal else row
minor = row if horizontal else col
gap = max(1, 1 + size // 86) - 1
span = 3 + gap
movement = spatial_step_index(now_us, speed)
band = 0 if abs(minor - ((minor_count - 1) / 2.0)) <= 0.55 else 1
orientation_right = direction in (0, 2, 4)
target = 1 if band == 0 else (0 if orientation_right else 2)
local = major - movement if orientation_right else major + movement
return 255 if major_count > 0 and (local % span) == target else 0
def scan_amount(col: int, row: int, led: int, now_us: int, speed: int, size: int, angle: int, option: int, direction: int) -> int:
x, y, _ = panel_led_position(led)
vertical = direction in (2, 3)
radians = math.radians((angle + (90 if vertical else 0)) % 360)
vx, vy = math.cos(radians), math.sin(radians)
progress = (col + x) * vx + (row + y) * vy
p00 = 0.0
p10 = NODE_COUNT * vx
p01 = ROWS * vy
p11 = p10 + p01
min_progress = min(p00, p10, p01, p11)
max_progress = max(p00, p10, p01, p11)
width = 0.15 + (size / 255.0) * (1.60 if option == 1 else 0.85)
travel = max_progress - min_progress
if travel <= 0.001:
return 0
phase = spatial_step_position(now_us, speed) % travel
if direction == 6:
phase = spatial_step_position(now_us, speed) % (travel * 2.0)
if phase > travel:
phase = (travel * 2.0) - phase
elif direction in (1, 3):
phase = travel - phase
center = min_progress + max(0.0, min(travel, phase))
if option == 1:
period = width * 2.0 + 0.35
d = abs(((progress - center + period * 64.0) % period) - period * 0.5)
return clamp_byte((1.0 - smoothstep(width * 0.45, width * 0.75, d)) * 255.0)
return clamp_byte((1.0 - smoothstep(width * 0.5, width * 0.5 + 0.55, abs(progress - center))) * 255.0)
def snake_sample(col: int, row: int, now_us: int, speed: int, size: int, seed: int) -> tuple[int, str]:
panel_index = grid_panel_index(col, row)
target_length = snake_length_from_size(size)
body, apple = snake_state_at(spatial_step_index(now_us, speed) % 240, target_length, seed)
for offset, position in enumerate(body):
if panel_index == position:
return 255, "primary"
return (255, "secondary") if panel_index == apple else (0, "gradient")
def schlaengeln_sample(col: int, row: int, now_us: int, speed: int, size: int, variant: int, direction: int) -> tuple[int, str]:
path_len = NODE_COUNT * ROWS
panel_index = mirrored_serpentine_panel_index(col, row, variant)
length = chain_length_from_size(size)
step = spatial_step_index(now_us, speed)
for offset in range(length):
if direction == 6:
pos = schlaengeln_pingpong_position(step, offset, path_len - 1)
elif direction == 1:
head = (path_len - 1) - (step % path_len)
pos = (head + path_len - (offset % path_len)) % path_len
else:
pos = (step + path_len - (offset % path_len)) % path_len
if panel_index == pos:
return max(55, 255 - offset * 30), "gradient"
return 0, "gradient"
def sunburst_sample(col: int, row: int, led: int, now_us: int, speed: int, variant: int, option: int) -> tuple[int, str, float]:
x, y = sunburst_panel_led_position(col, row, led)
cx, cy = physical_panel_center()
dx, dy = x - cx, y - cy
wobble = math.sin(spatial_beat_position(now_us, speed) * math.tau) * 0.18 if variant == 1 else 0.0
rotation = spatial_beat_position(now_us, speed) * (math.tau / 24.0) if variant == 2 else 0.0
angle = (math.atan2(dy, dx) + wobble - rotation) % math.tau
active = math.cos(angle * 12.0) >= 0.0
if option == 1:
return 255, "primary" if active else "secondary", 0.0
if option == 2:
hue = (angle / math.tau) + (spatial_beat_position(now_us, speed) * 8.0 / 255.0)
return (255, "rainbow", hue) if active else (255, "black", 0.0)
return (255, "gradient", 0.0) if active else (255, "black", 0.0)
def blend(primary: list[int], secondary: list[int], amount: int) -> list[int]:
return [clamp_byte(secondary[i] + (primary[i] - secondary[i]) * amount / 255.0) for i in range(3)]
def layer_sample(mode: int, col: int, row: int, led: int, now_us: int, spatial: dict[str, Any], scene: dict[str, Any]) -> tuple[int, str]:
speed = int(scene.get("speed", 128))
variant = int(spatial.get("variant", 0))
direction = int(spatial.get("direction", 0))
size = int(spatial.get("size", 64))
if mode == 1:
return center_pulse_amount(col, row, led, now_us, speed, variant), "gradient"
if mode == 2:
return checker_amount(col, row, led, now_us, speed, variant), "gradient"
if mode == 3:
return arrow_amount(col, row, now_us, speed, direction, size), "gradient"
if mode == 4:
return scan_amount(col, row, led, now_us, speed, size, int(spatial.get("angle", 0)), int(spatial.get("option", 0)), direction), "gradient"
if mode == 5:
return snake_sample(col, row, now_us, speed, size, int(scene.get("seed", 1)))
if mode == 6:
return wave_line_amount(col, row, now_us, speed, direction), "gradient"
if mode == 7:
return strobe_amount(now_us, speed, size), "gradient"
if mode == 8:
return schlaengeln_sample(col, row, now_us, speed, size, variant, direction)
if mode == 9:
amount, role, hue = sunburst_sample(col, row, led, now_us, speed, variant, int(spatial.get("option", 0)))
return amount, f"rainbow:{hue}" if role == "rainbow" else role
return 0, "gradient"
def synthetic_effect_color(col: int, row: int, led: int, now_us: int, scene: dict[str, Any]) -> list[int]:
x, y, _ = panel_led_position(led)
effect = int(scene.get("effect", 0))
palette = int(scene.get("palette", 0))
speed = int(scene.get("speed", 128))
phase = spatial_beat_position(now_us, speed)
hue = (col * 0.10 + row * 0.17 + x * 0.12 + y * 0.08 + phase * 0.07 + effect * 0.013 + palette * 0.021) % 1.0
if effect == 0:
return scene.get("primary", [255, 160, 80])[:3]
return hsv_to_rgb(hue, 0.82, 1.0)
def sample_color(primary: list[int], secondary: list[int], effect_color: list[int], amount: int, role: str) -> list[int]:
if role == "primary":
return [clamp_byte(channel * amount / 255.0) for channel in primary]
if role == "secondary":
return [clamp_byte(channel * amount / 255.0) for channel in secondary]
if role == "black":
return [0, 0, 0]
if role.startswith("rainbow:"):
try:
hue = float(role.split(":", 1)[1])
except (IndexError, ValueError):
hue = 0.0
return [clamp_byte(channel * amount / 255.0) for channel in hsv_to_rgb(hue)]
return [clamp_byte(channel * amount / 255.0) for channel in effect_color]
def is_direct_role(role: str) -> bool:
return role in ("primary", "secondary") or role.startswith("rainbow:")
def render_frame(state: dict[str, Any]) -> dict[str, Any]:
scene = state.get("scene", {})
spatial = scene.get("spatial", {}) or {}
mode = int(spatial.get("mode", 0))
strength = int(spatial.get("strength", 180))
primary = scene.get("primary", [255, 160, 80])[:3]
secondary = scene.get("secondary", [0, 32, 255])[:3]
now_us = int(time.monotonic() * 1_000_000) + int(scene.get("phase", 0)) * 1000
speed = int(scene.get("speed", 128))
panels: list[list[list[list[int]]]] = []
for row in range(ROWS):
row_panels = []
for col in range(NODE_COUNT):
leds = []
for led in range(LEDS_PER_PANEL):
amount, role = layer_sample(mode, col, row, led, now_us, spatial, scene)
if not is_direct_role(role):
amount = clamp_byte(amount * strength / 255.0)
effect_color = synthetic_effect_color(col, row, led, now_us, scene)
leds.append(sample_color(primary, secondary, effect_color, amount, role) if amount else [0, 0, 0])
row_panels.append(leds)
panels.append(row_panels)
panel_info = [
[
{
"label": f"ESP{col + 1} {OUTPUT_LABELS[row]}",
"transform": f"r{PANEL_TRANSFORMS[row][col][0] * 90} mx{int(PANEL_TRANSFORMS[row][col][1])} my{int(PANEL_TRANSFORMS[row][col][2])}",
}
for col in range(NODE_COUNT)
]
for row in range(ROWS)
]
return {
"scene": scene,
"node_ips": state.get("node_ips", []),
"panels": panels,
"panel_info": panel_info,
"mode_name": MODE_NAMES[mode] if 0 <= mode < len(MODE_NAMES) else "Unknown",
"variant_name": (
SCHLAENGELN_VARIANT_NAMES[int(spatial.get("variant", 0))]
if mode == 8 and int(spatial.get("variant", 0)) < len(SCHLAENGELN_VARIANT_NAMES)
else SUNBURST_VARIANT_NAMES[int(spatial.get("variant", 0))]
if mode == 9 and int(spatial.get("variant", 0)) < len(SUNBURST_VARIANT_NAMES)
else VARIANT_NAMES[int(spatial.get("variant", 0))]
if int(spatial.get("variant", 0)) < len(VARIANT_NAMES)
else "Unknown"
),
"direction_name": DIRECTION_NAMES[int(spatial.get("direction", 0))] if int(spatial.get("direction", 0)) < len(DIRECTION_NAMES) else "Unknown",
"note": "2D layer with synthetic effect-color preview",
}
class VisualizerServer(ThreadingHTTPServer):
allow_reuse_address = True
master: str
timeout_s: float
class Handler(BaseHTTPRequestHandler):
server: VisualizerServer
def log_message(self, fmt: str, *args: Any) -> None:
message = fmt % args
if '"GET /api/' in message and (' 502 ' in message or ' 504 ' in message):
return
sys.stderr.write("%s - %s\n" % (self.log_date_time_string(), message))
def send_bytes(self, status: int, content_type: str, body: bytes) -> None:
try:
self.send_response(status)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self.wfile.write(body)
except (BrokenPipeError, ConnectionResetError):
pass
def do_GET(self) -> None:
if self.path == "/" or self.path.startswith("/?"):
self.send_bytes(200, "text/html; charset=utf-8", HTML.encode("utf-8"))
return
if self.path.startswith("/api/infinity"):
self.proxy_infinity(self.master_from_query())
return
if self.path.startswith("/api/frame"):
self.proxy_frame(self.master_from_query())
return
if self.path == "/health":
self.send_bytes(200, "application/json", b'{"ok":true}')
return
self.send_bytes(404, "text/plain; charset=utf-8", b"not found")
def master_from_query(self) -> str:
values = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
return values.get("master", [self.server.master])[0].strip() or self.server.master
def fetch_master_state(self, master: str) -> dict[str, Any]:
url = f"http://{master}/json/infinity"
with urllib.request.urlopen(url, timeout=self.server.timeout_s) as response:
body = response.read()
return json.loads(body.decode("utf-8"))
def proxy_infinity(self, master: str) -> None:
try:
state = self.fetch_master_state(master)
except (urllib.error.URLError, socket.timeout, TimeoutError) as exc:
self.send_bytes(504, "application/json", json.dumps({"error":"master unreachable","detail":str(exc)}).encode("utf-8"))
return
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
self.send_bytes(502, "application/json", json.dumps({"error":f"invalid master JSON: {exc}"}).encode("utf-8"))
return
self.send_bytes(200, "application/json", json.dumps(state).encode("utf-8"))
def proxy_frame(self, master: str) -> None:
try:
state = self.fetch_master_state(master)
frame = render_frame(state)
except (urllib.error.URLError, socket.timeout, TimeoutError) as exc:
self.send_bytes(504, "application/json", json.dumps({"error":"master unreachable","detail":str(exc)}).encode("utf-8"))
return
except Exception as exc: # keep the operator UI alive and explicit
self.send_bytes(502, "application/json", json.dumps({"error":f"frame render failed: {exc}"}).encode("utf-8"))
return
self.send_bytes(200, "application/json", json.dumps(frame, separators=(",", ":")).encode("utf-8"))
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Serve a local Infinity Global-2D visualizer.")
parser.add_argument("--master", default="10.42.0.213", help="Infinity master IP or hostname")
parser.add_argument("--bind", default="127.0.0.1", help="Local bind address")
parser.add_argument("--port", type=int, default=8765, help="Local HTTP port")
parser.add_argument("--no-port-fallback", action="store_true", help="Fail instead of trying the next ports when busy")
parser.add_argument("--timeout", type=float, default=1.2, help="Master request timeout in seconds")
return parser.parse_args()
def bind_server(args: argparse.Namespace) -> VisualizerServer:
last_error: OSError | None = None
ports = [args.port] if args.no_port_fallback else range(args.port, args.port + 50)
for port in ports:
try:
server = VisualizerServer((args.bind, port), Handler)
if port != args.port:
print(f"Port {args.port} is busy, using {port} instead.")
return server
except OSError as exc:
last_error = exc
if exc.errno != errno.EADDRINUSE or args.no_port_fallback:
break
if last_error and last_error.errno == errno.EADDRINUSE:
raise SystemExit(f"Could not start visualizer: ports {args.port}-{args.port + 49} are busy.")
raise last_error or RuntimeError("Could not bind visualizer server")
def main() -> int:
args = parse_args()
server = bind_server(args)
server.master = args.master
server.timeout_s = args.timeout
host, port = server.server_address[:2]
print(f"Infinity visualizer: http://{host}:{port}/?master={args.master}")
print(f"Proxying master: http://{args.master}/json/infinity")
print("Rendering Infinity Global-2D layer with synthetic effect colors and panel calibration.")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nStopped.")
finally:
server.server_close()
return 0
if __name__ == "__main__":
raise SystemExit(main())