from __future__ import annotations from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass import ipaddress import json import socket import struct import time from typing import Callable, Iterable, Sequence from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from app.config.models import InfinityMirrorConfig WLED_INFO_TIMEOUT_S = 0.35 WLED_DISCOVERY_WORKERS = 32 WLED_DDP_PORT = 4048 WLED_DDP_HEADER_LENGTH = 10 WLED_DDP_MAX_DATA_LENGTH = 1440 WLED_DDP_VERSION_1 = 0x40 WLED_DDP_PUSH_FLAG = 0x01 WLED_DDP_RGB888 = 0x0B WLED_DDP_DESTINATION_ID = 1 @dataclass(frozen=True) class DiscoveredWledDevice: ip_address: str hostname: str = "" instance_name: str = "" mac_address: str = "" led_count: int = 0 info_endpoint: str = "" def normalize_mac_address(value: str) -> str: raw = str(value or "").strip().replace("-", ":").upper() if not raw: return "" parts = [part.zfill(2) for part in raw.split(":") if part] if len(parts) == 6 and all(len(part) == 2 for part in parts): return ":".join(parts) return raw def fetch_wled_info(host: str, timeout_s: float = WLED_INFO_TIMEOUT_S) -> tuple[dict[str, object], str] | None: normalized_host = str(host or "").strip() if not normalized_host: return None for endpoint in ("/json/info", "/json"): payload = _load_json(f"http://{normalized_host}{endpoint}", timeout_s=timeout_s) if not isinstance(payload, dict): continue if endpoint == "/json/info": if _looks_like_wled_info(payload): return payload, endpoint continue info = payload.get("info") if isinstance(info, dict) and _looks_like_wled_info(info): return info, endpoint return None def probe_wled_device(host: str, timeout_s: float = WLED_INFO_TIMEOUT_S) -> DiscoveredWledDevice | None: info_result = fetch_wled_info(host, timeout_s=timeout_s) if info_result is None: return None info, endpoint = info_result hostname = _string_value(info.get("mdns")) or _reverse_dns_name(host) instance_name = _string_value(info.get("name")) mac_address = normalize_mac_address(_string_value(info.get("mac"))) leds = info.get("leds") led_count = 0 if isinstance(leds, dict): count = leds.get("count") if isinstance(count, str): try: led_count = int(count) except ValueError: led_count = 0 elif isinstance(count, (int, float)): led_count = int(count) return DiscoveredWledDevice( ip_address=str(host).strip(), hostname=hostname, instance_name=instance_name, mac_address=mac_address, led_count=max(0, led_count), info_endpoint=endpoint, ) def scan_candidate_subnets(config: InfinityMirrorConfig | None = None) -> list[ipaddress.IPv4Network]: networks: list[ipaddress.IPv4Network] = [] seen: set[str] = set() for address in _candidate_ipv4_addresses(config): try: ip_value = ipaddress.ip_address(address) except ValueError: continue if not isinstance(ip_value, ipaddress.IPv4Address): continue if ip_value.is_loopback or ip_value.is_link_local or ip_value.is_unspecified: continue network = ipaddress.ip_network(f"{ip_value}/24", strict=False) key = str(network) if key in seen: continue seen.add(key) networks.append(network) return sorted(networks, key=lambda network: int(network.network_address)) def build_scan_hosts(config: InfinityMirrorConfig | None = None, max_subnets: int = 3) -> list[str]: preferred_hosts = [tile.controller_ip.strip() for tile in config.sorted_tiles()] if config is not None else [] prioritized = [host for host in preferred_hosts if host] hosts: list[str] = [] seen: set[str] = set() for host in prioritized: if host not in seen: seen.add(host) hosts.append(host) for network in scan_candidate_subnets(config)[: max(1, max_subnets)]: for host in network.hosts(): text = str(host) if text in seen: continue seen.add(text) hosts.append(text) return hosts def discover_wled_devices( hosts: Sequence[str], *, timeout_s: float = WLED_INFO_TIMEOUT_S, max_workers: int = WLED_DISCOVERY_WORKERS, progress_callback: Callable[[int, int, DiscoveredWledDevice | None], None] | None = None, ) -> list[DiscoveredWledDevice]: unique_hosts = [str(host).strip() for host in hosts if str(host).strip()] total = len(unique_hosts) if total == 0: return [] devices: list[DiscoveredWledDevice] = [] completed = 0 seen_device_keys: set[str] = set() with ThreadPoolExecutor(max_workers=max(1, min(max_workers, total))) as executor: future_map = { executor.submit(probe_wled_device, host, timeout_s): host for host in unique_hosts } for future in as_completed(future_map): completed += 1 device: DiscoveredWledDevice | None = None try: device = future.result() except Exception: device = None if device is not None: key = device.mac_address or device.ip_address if key not in seen_device_keys: seen_device_keys.add(key) devices.append(device) if progress_callback is not None: progress_callback(completed, total, device) return sorted(devices, key=lambda item: tuple(int(part) for part in item.ip_address.split("."))) def identify_wled_device( host: str, *, led_count: int | None = None, duration_s: float = 1.6, pulse_interval_s: float = 0.22, ) -> None: device = probe_wled_device(host, timeout_s=max(WLED_INFO_TIMEOUT_S, 0.45)) if device is None: raise OSError(f"WLED device at {host} is unreachable.") pixel_count = int(led_count or device.led_count) if pixel_count <= 0: raise ValueError(f"Unable to determine LED count for {host}.") on_payload = bytes((255, 32, 32)) * pixel_count off_payload = bytes((0, 0, 0)) * pixel_count deadline = time.monotonic() + max(0.2, float(duration_s)) pulse_delay = max(0.08, float(pulse_interval_s)) sequence = 1 visible_phase = True with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: while True: _send_ddp_frame(sock, host, on_payload if visible_phase else off_payload, sequence) sequence = 1 if sequence >= 15 else sequence + 1 remaining = deadline - time.monotonic() if remaining <= 0: break time.sleep(min(pulse_delay, remaining)) visible_phase = not visible_phase _send_ddp_frame(sock, host, off_payload, sequence) def _candidate_ipv4_addresses(config: InfinityMirrorConfig | None = None) -> Iterable[str]: if config is not None: for tile in config.sorted_tiles(): controller_ip = tile.controller_ip.strip() if controller_ip: yield controller_ip yielded: set[str] = set() for host in {socket.gethostname(), socket.getfqdn()}: try: _, _, addresses = socket.gethostbyname_ex(host) except OSError: continue for address in addresses: if address and address not in yielded: yielded.add(address) yield address try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.connect(("8.8.8.8", 80)) address = sock.getsockname()[0] if address and address not in yielded: yield address except OSError: return def _load_json(url: str, *, timeout_s: float) -> dict[str, object] | None: request = Request(url, headers={"Accept": "application/json"}) try: with urlopen(request, timeout=timeout_s) as response: payload = json.loads(response.read().decode("utf-8")) except (HTTPError, URLError, OSError, TimeoutError, json.JSONDecodeError, UnicodeDecodeError): return None return payload if isinstance(payload, dict) else None def _looks_like_wled_info(payload: dict[str, object]) -> bool: if not isinstance(payload, dict): return False if "leds" not in payload: return False return any(key in payload for key in ("name", "ver", "mac")) def _reverse_dns_name(host: str) -> str: try: name, _, _ = socket.gethostbyaddr(host) except OSError: return "" return "" if name == host else name def _string_value(value: object) -> str: return value.strip() if isinstance(value, str) else "" def _send_ddp_frame(sock: socket.socket, host: str, payload: bytes, sequence: int) -> None: for offset in range(0, len(payload), WLED_DDP_MAX_DATA_LENGTH): chunk = payload[offset : offset + WLED_DDP_MAX_DATA_LENGTH] last = offset + WLED_DDP_MAX_DATA_LENGTH >= len(payload) header = struct.pack( "!BBBBLH", WLED_DDP_VERSION_1 | (WLED_DDP_PUSH_FLAG if last else 0), sequence, WLED_DDP_RGB888, WLED_DDP_DESTINATION_ID, offset, len(chunk), ) sock.sendto(header + chunk, (host, WLED_DDP_PORT))