293 lines
9.4 KiB
Python
293 lines
9.4 KiB
Python
from __future__ import annotations
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from dataclasses import dataclass
|
|
import ipaddress
|
|
import json
|
|
import socket
|
|
import struct
|
|
import time
|
|
from typing import Callable, Iterable, Sequence
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
from app.config.models import InfinityMirrorConfig
|
|
|
|
WLED_INFO_TIMEOUT_S = 0.35
|
|
WLED_DISCOVERY_WORKERS = 32
|
|
WLED_DDP_PORT = 4048
|
|
WLED_DDP_HEADER_LENGTH = 10
|
|
WLED_DDP_MAX_DATA_LENGTH = 1440
|
|
WLED_DDP_VERSION_1 = 0x40
|
|
WLED_DDP_PUSH_FLAG = 0x01
|
|
WLED_DDP_RGB888 = 0x0B
|
|
WLED_DDP_DESTINATION_ID = 1
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DiscoveredWledDevice:
|
|
ip_address: str
|
|
hostname: str = ""
|
|
instance_name: str = ""
|
|
mac_address: str = ""
|
|
led_count: int = 0
|
|
info_endpoint: str = ""
|
|
|
|
|
|
def normalize_mac_address(value: str) -> str:
|
|
raw = str(value or "").strip().replace("-", ":").upper()
|
|
if not raw:
|
|
return ""
|
|
parts = [part.zfill(2) for part in raw.split(":") if part]
|
|
if len(parts) == 6 and all(len(part) == 2 for part in parts):
|
|
return ":".join(parts)
|
|
return raw
|
|
|
|
|
|
def fetch_wled_info(host: str, timeout_s: float = WLED_INFO_TIMEOUT_S) -> tuple[dict[str, object], str] | None:
|
|
normalized_host = str(host or "").strip()
|
|
if not normalized_host:
|
|
return None
|
|
|
|
for endpoint in ("/json/info", "/json"):
|
|
payload = _load_json(f"http://{normalized_host}{endpoint}", timeout_s=timeout_s)
|
|
if not isinstance(payload, dict):
|
|
continue
|
|
if endpoint == "/json/info":
|
|
if _looks_like_wled_info(payload):
|
|
return payload, endpoint
|
|
continue
|
|
info = payload.get("info")
|
|
if isinstance(info, dict) and _looks_like_wled_info(info):
|
|
return info, endpoint
|
|
return None
|
|
|
|
|
|
def probe_wled_device(host: str, timeout_s: float = WLED_INFO_TIMEOUT_S) -> DiscoveredWledDevice | None:
|
|
info_result = fetch_wled_info(host, timeout_s=timeout_s)
|
|
if info_result is None:
|
|
return None
|
|
|
|
info, endpoint = info_result
|
|
hostname = _string_value(info.get("mdns")) or _reverse_dns_name(host)
|
|
instance_name = _string_value(info.get("name"))
|
|
mac_address = normalize_mac_address(_string_value(info.get("mac")))
|
|
leds = info.get("leds")
|
|
led_count = 0
|
|
if isinstance(leds, dict):
|
|
count = leds.get("count")
|
|
if isinstance(count, str):
|
|
try:
|
|
led_count = int(count)
|
|
except ValueError:
|
|
led_count = 0
|
|
elif isinstance(count, (int, float)):
|
|
led_count = int(count)
|
|
|
|
return DiscoveredWledDevice(
|
|
ip_address=str(host).strip(),
|
|
hostname=hostname,
|
|
instance_name=instance_name,
|
|
mac_address=mac_address,
|
|
led_count=max(0, led_count),
|
|
info_endpoint=endpoint,
|
|
)
|
|
|
|
|
|
def scan_candidate_subnets(config: InfinityMirrorConfig | None = None) -> list[ipaddress.IPv4Network]:
|
|
networks: list[ipaddress.IPv4Network] = []
|
|
seen: set[str] = set()
|
|
|
|
for address in _candidate_ipv4_addresses(config):
|
|
try:
|
|
ip_value = ipaddress.ip_address(address)
|
|
except ValueError:
|
|
continue
|
|
if not isinstance(ip_value, ipaddress.IPv4Address):
|
|
continue
|
|
if ip_value.is_loopback or ip_value.is_link_local or ip_value.is_unspecified:
|
|
continue
|
|
network = ipaddress.ip_network(f"{ip_value}/24", strict=False)
|
|
key = str(network)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
networks.append(network)
|
|
|
|
return sorted(networks, key=lambda network: int(network.network_address))
|
|
|
|
|
|
def build_scan_hosts(config: InfinityMirrorConfig | None = None, max_subnets: int = 3) -> list[str]:
|
|
preferred_hosts = [tile.controller_ip.strip() for tile in config.sorted_tiles()] if config is not None else []
|
|
prioritized = [host for host in preferred_hosts if host]
|
|
|
|
hosts: list[str] = []
|
|
seen: set[str] = set()
|
|
|
|
for host in prioritized:
|
|
if host not in seen:
|
|
seen.add(host)
|
|
hosts.append(host)
|
|
|
|
for network in scan_candidate_subnets(config)[: max(1, max_subnets)]:
|
|
for host in network.hosts():
|
|
text = str(host)
|
|
if text in seen:
|
|
continue
|
|
seen.add(text)
|
|
hosts.append(text)
|
|
|
|
return hosts
|
|
|
|
|
|
def discover_wled_devices(
|
|
hosts: Sequence[str],
|
|
*,
|
|
timeout_s: float = WLED_INFO_TIMEOUT_S,
|
|
max_workers: int = WLED_DISCOVERY_WORKERS,
|
|
progress_callback: Callable[[int, int, DiscoveredWledDevice | None], None] | None = None,
|
|
) -> list[DiscoveredWledDevice]:
|
|
unique_hosts = [str(host).strip() for host in hosts if str(host).strip()]
|
|
total = len(unique_hosts)
|
|
if total == 0:
|
|
return []
|
|
|
|
devices: list[DiscoveredWledDevice] = []
|
|
completed = 0
|
|
seen_device_keys: set[str] = set()
|
|
|
|
with ThreadPoolExecutor(max_workers=max(1, min(max_workers, total))) as executor:
|
|
future_map = {
|
|
executor.submit(probe_wled_device, host, timeout_s): host
|
|
for host in unique_hosts
|
|
}
|
|
for future in as_completed(future_map):
|
|
completed += 1
|
|
device: DiscoveredWledDevice | None = None
|
|
try:
|
|
device = future.result()
|
|
except Exception:
|
|
device = None
|
|
|
|
if device is not None:
|
|
key = device.mac_address or device.ip_address
|
|
if key not in seen_device_keys:
|
|
seen_device_keys.add(key)
|
|
devices.append(device)
|
|
|
|
if progress_callback is not None:
|
|
progress_callback(completed, total, device)
|
|
|
|
return sorted(devices, key=lambda item: tuple(int(part) for part in item.ip_address.split(".")))
|
|
|
|
|
|
def identify_wled_device(
|
|
host: str,
|
|
*,
|
|
led_count: int | None = None,
|
|
duration_s: float = 1.6,
|
|
pulse_interval_s: float = 0.22,
|
|
) -> None:
|
|
device = probe_wled_device(host, timeout_s=max(WLED_INFO_TIMEOUT_S, 0.45))
|
|
if device is None:
|
|
raise OSError(f"WLED device at {host} is unreachable.")
|
|
|
|
pixel_count = int(led_count or device.led_count)
|
|
if pixel_count <= 0:
|
|
raise ValueError(f"Unable to determine LED count for {host}.")
|
|
|
|
on_payload = bytes((255, 32, 32)) * pixel_count
|
|
off_payload = bytes((0, 0, 0)) * pixel_count
|
|
|
|
deadline = time.monotonic() + max(0.2, float(duration_s))
|
|
pulse_delay = max(0.08, float(pulse_interval_s))
|
|
sequence = 1
|
|
visible_phase = True
|
|
|
|
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
|
|
while True:
|
|
_send_ddp_frame(sock, host, on_payload if visible_phase else off_payload, sequence)
|
|
sequence = 1 if sequence >= 15 else sequence + 1
|
|
|
|
remaining = deadline - time.monotonic()
|
|
if remaining <= 0:
|
|
break
|
|
time.sleep(min(pulse_delay, remaining))
|
|
visible_phase = not visible_phase
|
|
|
|
_send_ddp_frame(sock, host, off_payload, sequence)
|
|
|
|
|
|
def _candidate_ipv4_addresses(config: InfinityMirrorConfig | None = None) -> Iterable[str]:
|
|
if config is not None:
|
|
for tile in config.sorted_tiles():
|
|
controller_ip = tile.controller_ip.strip()
|
|
if controller_ip:
|
|
yield controller_ip
|
|
|
|
yielded: set[str] = set()
|
|
for host in {socket.gethostname(), socket.getfqdn()}:
|
|
try:
|
|
_, _, addresses = socket.gethostbyname_ex(host)
|
|
except OSError:
|
|
continue
|
|
for address in addresses:
|
|
if address and address not in yielded:
|
|
yielded.add(address)
|
|
yield address
|
|
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
|
|
sock.connect(("8.8.8.8", 80))
|
|
address = sock.getsockname()[0]
|
|
if address and address not in yielded:
|
|
yield address
|
|
except OSError:
|
|
return
|
|
|
|
|
|
def _load_json(url: str, *, timeout_s: float) -> dict[str, object] | None:
|
|
request = Request(url, headers={"Accept": "application/json"})
|
|
try:
|
|
with urlopen(request, timeout=timeout_s) as response:
|
|
payload = json.loads(response.read().decode("utf-8"))
|
|
except (HTTPError, URLError, OSError, TimeoutError, json.JSONDecodeError, UnicodeDecodeError):
|
|
return None
|
|
return payload if isinstance(payload, dict) else None
|
|
|
|
|
|
def _looks_like_wled_info(payload: dict[str, object]) -> bool:
|
|
if not isinstance(payload, dict):
|
|
return False
|
|
if "leds" not in payload:
|
|
return False
|
|
return any(key in payload for key in ("name", "ver", "mac"))
|
|
|
|
|
|
def _reverse_dns_name(host: str) -> str:
|
|
try:
|
|
name, _, _ = socket.gethostbyaddr(host)
|
|
except OSError:
|
|
return ""
|
|
return "" if name == host else name
|
|
|
|
|
|
def _string_value(value: object) -> str:
|
|
return value.strip() if isinstance(value, str) else ""
|
|
|
|
|
|
def _send_ddp_frame(sock: socket.socket, host: str, payload: bytes, sequence: int) -> None:
|
|
for offset in range(0, len(payload), WLED_DDP_MAX_DATA_LENGTH):
|
|
chunk = payload[offset : offset + WLED_DDP_MAX_DATA_LENGTH]
|
|
last = offset + WLED_DDP_MAX_DATA_LENGTH >= len(payload)
|
|
header = struct.pack(
|
|
"!BBBBLH",
|
|
WLED_DDP_VERSION_1 | (WLED_DDP_PUSH_FLAG if last else 0),
|
|
sequence,
|
|
WLED_DDP_RGB888,
|
|
WLED_DDP_DESTINATION_ID,
|
|
offset,
|
|
len(chunk),
|
|
)
|
|
sock.sendto(header + chunk, (host, WLED_DDP_PORT))
|