389 lines
14 KiB
Python
Executable File
389 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Discover WLED devices in local networks and flash them sequentially via OTA.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ipaddress
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
import requests
|
|
|
|
|
|
DEFAULT_OUTPUT_FILE = "tools/discovered_wled_hosts.txt"
|
|
|
|
|
|
@dataclass
|
|
class WledHost:
|
|
ip: str
|
|
name: str
|
|
version: str
|
|
arch: str
|
|
|
|
|
|
@dataclass
|
|
class WledInfo(WledHost):
|
|
uptime_s: int
|
|
|
|
|
|
def _run(cmd: list[str]) -> str:
|
|
proc = subprocess.run(cmd, check=True, capture_output=True, text=True)
|
|
return proc.stdout
|
|
|
|
|
|
def local_networks() -> list[ipaddress.IPv4Network]:
|
|
"""
|
|
Read active IPv4 interfaces from `ip` and return private subnets.
|
|
"""
|
|
out = _run(["ip", "-o", "-4", "addr", "show", "scope", "global"])
|
|
nets: list[ipaddress.IPv4Network] = []
|
|
seen: set[str] = set()
|
|
for line in out.splitlines():
|
|
match = re.search(r"\binet\s+(\d+\.\d+\.\d+\.\d+/\d+)\b", line)
|
|
if not match:
|
|
continue
|
|
iface_cidr = match.group(1)
|
|
iface_ip = ipaddress.ip_interface(iface_cidr)
|
|
net = iface_ip.network
|
|
if not iface_ip.ip.is_private:
|
|
continue
|
|
if net.num_addresses > 2048:
|
|
# avoid accidentally scanning very large ranges by default
|
|
net = ipaddress.ip_network(f"{iface_ip.ip}/24", strict=False)
|
|
key = str(net)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
nets.append(net)
|
|
return nets
|
|
|
|
|
|
def parse_networks(raw: Iterable[str] | None) -> list[ipaddress.IPv4Network]:
|
|
if raw:
|
|
nets: list[ipaddress.IPv4Network] = []
|
|
for item in raw:
|
|
nets.append(ipaddress.ip_network(item, strict=False))
|
|
return nets
|
|
return local_networks()
|
|
|
|
|
|
def probe_wled_info(ip: str, timeout_s: float) -> WledInfo | None:
|
|
url = f"http://{ip}/json/info"
|
|
try:
|
|
resp = requests.get(url, timeout=timeout_s)
|
|
if resp.status_code != 200:
|
|
return None
|
|
data = resp.json()
|
|
except (requests.RequestException, ValueError):
|
|
return None
|
|
|
|
# WLED info endpoint typically includes "name", "ver", and "arch"
|
|
ver = str(data.get("ver", "")).strip()
|
|
name = str(data.get("name", "")).strip()
|
|
arch = str(data.get("arch", "")).strip()
|
|
if not ver:
|
|
return None
|
|
|
|
if not name:
|
|
name = "WLED"
|
|
if not arch:
|
|
arch = "-"
|
|
|
|
uptime_s = int(data.get("uptime", 0) or 0)
|
|
return WledInfo(ip=ip, name=name, version=ver, arch=arch, uptime_s=uptime_s)
|
|
|
|
|
|
def probe_wled(ip: str, timeout_s: float) -> WledHost | None:
|
|
info = probe_wled_info(ip, timeout_s)
|
|
if info is None:
|
|
return None
|
|
return WledHost(ip=info.ip, name=info.name, version=info.version, arch=info.arch)
|
|
|
|
|
|
def discover_hosts(
|
|
nets: list[ipaddress.IPv4Network],
|
|
timeout_s: float,
|
|
workers: int,
|
|
) -> list[WledHost]:
|
|
candidates: list[str] = []
|
|
for net in nets:
|
|
for host in net.hosts():
|
|
candidates.append(str(host))
|
|
|
|
found: list[WledHost] = []
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = [executor.submit(probe_wled, ip, timeout_s) for ip in candidates]
|
|
for fut in as_completed(futures):
|
|
result = fut.result()
|
|
if result is not None:
|
|
found.append(result)
|
|
|
|
found.sort(key=lambda h: tuple(int(part) for part in h.ip.split(".")))
|
|
return found
|
|
|
|
|
|
def read_targets_file(path: Path) -> list[str]:
|
|
targets: list[str] = []
|
|
for raw in path.read_text(encoding="utf-8").splitlines():
|
|
line = raw.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
targets.append(line.split()[0])
|
|
return targets
|
|
|
|
|
|
def write_discovery(path: Path, hosts: list[WledHost]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
lines = ["# ip name version arch"]
|
|
lines += [f"{h.ip} {h.name} {h.version} {h.arch}" for h in hosts]
|
|
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
|
|
|
|
def wait_for_online(ip: str, timeout_s: float, interval_s: float) -> bool:
|
|
deadline = time.time() + timeout_s
|
|
while time.time() < deadline:
|
|
if probe_wled(ip, timeout_s=1.2) is not None:
|
|
return True
|
|
time.sleep(interval_s)
|
|
return False
|
|
|
|
|
|
def wait_for_offline(ip: str, timeout_s: float, interval_s: float) -> bool:
|
|
deadline = time.time() + timeout_s
|
|
while time.time() < deadline:
|
|
if probe_wled(ip, timeout_s=1.2) is None:
|
|
return True
|
|
time.sleep(interval_s)
|
|
return False
|
|
|
|
|
|
def wait_for_online_info(ip: str, timeout_s: float, interval_s: float) -> WledInfo | None:
|
|
deadline = time.time() + timeout_s
|
|
while time.time() < deadline:
|
|
info = probe_wled_info(ip, timeout_s=1.2)
|
|
if info is not None:
|
|
return info
|
|
time.sleep(interval_s)
|
|
return None
|
|
|
|
|
|
def reboot_confirmed(before: WledInfo | None, after: WledInfo, offline_seen: bool) -> tuple[bool, str]:
|
|
if offline_seen:
|
|
return True, "offline transition observed"
|
|
if before is None:
|
|
return False, "device was not profiled before upload, and no offline transition was observed"
|
|
if after.uptime_s + 5 < before.uptime_s:
|
|
return True, f"uptime reset from {before.uptime_s}s to {after.uptime_s}s"
|
|
return False, f"device stayed reachable and uptime did not reset ({before.uptime_s}s -> {after.uptime_s}s)"
|
|
|
|
|
|
def ota_flash(ip: str, firmware: Path, connect_timeout_s: float, read_timeout_s: float) -> tuple[str, str]:
|
|
url = f"http://{ip}/update"
|
|
try:
|
|
with firmware.open("rb") as fh:
|
|
resp = requests.post(
|
|
url,
|
|
files={"update": (firmware.name, fh, "application/octet-stream")},
|
|
timeout=(connect_timeout_s, read_timeout_s),
|
|
)
|
|
except requests.ReadTimeout:
|
|
# Common with WLED OTA: upload is accepted but HTTP response never arrives before reboot.
|
|
return "uncertain", "read timeout after upload"
|
|
except requests.ConnectionError:
|
|
# Some devices close the socket abruptly when rebooting after successful OTA.
|
|
return "uncertain", "connection dropped during/after upload"
|
|
except requests.RequestException as exc:
|
|
return "failed", f"request failed: {exc}"
|
|
|
|
text = (resp.text or "").lower()
|
|
if resp.status_code >= 400:
|
|
return "failed", f"http {resp.status_code}"
|
|
if "fail" in text or "error" in text:
|
|
return "failed", "device reported update failure"
|
|
return "ok", "ok"
|
|
|
|
|
|
def print_hosts(hosts: list[WledHost]) -> None:
|
|
if not hosts:
|
|
print("No WLED devices found.")
|
|
return
|
|
print(f"Found {len(hosts)} WLED device(s):")
|
|
print(f"{'IP':<16} {'Name':<24} {'Version':<18} {'Arch'}")
|
|
print("-" * 80)
|
|
for h in hosts:
|
|
print(f"{h.ip:<16} {h.name[:24]:<24} {h.version[:18]:<18} {h.arch}")
|
|
|
|
|
|
def cmd_discover(args: argparse.Namespace) -> int:
|
|
nets = parse_networks(args.subnet)
|
|
if not nets:
|
|
print("No private IPv4 networks found. Pass --subnet explicitly.")
|
|
return 2
|
|
|
|
print("Scanning networks:", ", ".join(str(n) for n in nets))
|
|
hosts = discover_hosts(nets=nets, timeout_s=args.timeout, workers=args.workers)
|
|
print_hosts(hosts)
|
|
|
|
if args.output:
|
|
out = Path(args.output)
|
|
write_discovery(out, hosts)
|
|
print(f"Saved discovery list: {out}")
|
|
return 0
|
|
|
|
|
|
def resolve_targets(args: argparse.Namespace) -> list[str]:
|
|
targets: list[str] = []
|
|
|
|
if args.targets:
|
|
targets.extend([t.strip() for t in args.targets.split(",") if t.strip()])
|
|
|
|
if args.targets_file:
|
|
targets.extend(read_targets_file(Path(args.targets_file)))
|
|
|
|
if args.discover:
|
|
nets = parse_networks(args.subnet)
|
|
hosts = discover_hosts(nets=nets, timeout_s=args.timeout, workers=args.workers)
|
|
targets.extend([h.ip for h in hosts])
|
|
|
|
unique: list[str] = []
|
|
seen: set[str] = set()
|
|
for t in targets:
|
|
if t in seen:
|
|
continue
|
|
seen.add(t)
|
|
unique.append(t)
|
|
return unique
|
|
|
|
|
|
def cmd_flash(args: argparse.Namespace) -> int:
|
|
firmware = Path(args.firmware)
|
|
if not firmware.exists():
|
|
print(f"Firmware file not found: {firmware}")
|
|
return 2
|
|
|
|
targets = resolve_targets(args)
|
|
if not targets:
|
|
print("No targets selected. Use --targets, --targets-file, or --discover.")
|
|
return 2
|
|
if args.start_from:
|
|
if args.start_from not in targets:
|
|
print(f"--start-from target not found in target set: {args.start_from}")
|
|
return 2
|
|
start_idx = targets.index(args.start_from)
|
|
targets = targets[start_idx:]
|
|
|
|
print(f"Flashing {len(targets)} device(s) sequentially with: {firmware}")
|
|
failures: list[str] = []
|
|
|
|
for idx, ip in enumerate(targets, start=1):
|
|
before = probe_wled_info(ip, timeout_s=args.timeout)
|
|
if before is not None:
|
|
print(
|
|
f"[{idx}/{len(targets)}] {ip}: current firmware {before.version}, "
|
|
f"uptime {before.uptime_s}s, name '{before.name}'"
|
|
)
|
|
print(f"[{idx}/{len(targets)}] {ip}: uploading...")
|
|
status, msg = ota_flash(
|
|
ip=ip,
|
|
firmware=firmware,
|
|
connect_timeout_s=args.connect_timeout,
|
|
read_timeout_s=args.upload_timeout,
|
|
)
|
|
if status == "failed":
|
|
print(f"[{idx}/{len(targets)}] {ip}: FAILED ({msg})")
|
|
failures.append(ip)
|
|
continue
|
|
|
|
if status == "uncertain":
|
|
print(f"[{idx}/{len(targets)}] {ip}: upload response uncertain ({msg}), verifying via reboot check...")
|
|
else:
|
|
print(f"[{idx}/{len(targets)}] {ip}: uploaded, waiting {args.reboot_wait:.1f}s for reboot...")
|
|
time.sleep(args.reboot_wait)
|
|
|
|
offline_seen = wait_for_offline(ip=ip, timeout_s=args.offline_timeout, interval_s=0.5)
|
|
if offline_seen:
|
|
print(f"[{idx}/{len(targets)}] {ip}: reboot detected, device went offline.")
|
|
else:
|
|
print(f"[{idx}/{len(targets)}] {ip}: warning, no offline transition observed. Checking uptime reset...")
|
|
|
|
after = wait_for_online_info(ip=ip, timeout_s=args.online_timeout, interval_s=1.0)
|
|
if after is None:
|
|
print(f"[{idx}/{len(targets)}] {ip}: no online response after upload window.")
|
|
failures.append(ip)
|
|
continue
|
|
|
|
reboot_ok, reason = reboot_confirmed(before=before, after=after, offline_seen=offline_seen)
|
|
if not reboot_ok:
|
|
print(f"[{idx}/{len(targets)}] {ip}: FAILED (could not prove reboot: {reason})")
|
|
failures.append(ip)
|
|
continue
|
|
|
|
print(
|
|
f"[{idx}/{len(targets)}] {ip}: OK "
|
|
f"(now {after.version}, uptime {after.uptime_s}s, {reason})"
|
|
)
|
|
|
|
if failures:
|
|
print("\nFailed targets:")
|
|
for ip in failures:
|
|
print(f"- {ip}")
|
|
return 1
|
|
|
|
print("\nAll targets flashed successfully.")
|
|
return 0
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Discover and OTA-flash WLED devices.")
|
|
sub = parser.add_subparsers(dest="cmd", required=True)
|
|
|
|
p_discover = sub.add_parser("discover", help="Scan network(s) for WLED devices.")
|
|
p_discover.add_argument("--subnet", action="append", help="Subnet CIDR, repeatable (example: 192.168.1.0/24).")
|
|
p_discover.add_argument("--timeout", type=float, default=0.8, help="HTTP probe timeout in seconds.")
|
|
p_discover.add_argument("--workers", type=int, default=128, help="Parallel probe workers.")
|
|
p_discover.add_argument("--output", default=DEFAULT_OUTPUT_FILE, help="Output file for discovered hosts.")
|
|
p_discover.set_defaults(func=cmd_discover)
|
|
|
|
p_flash = sub.add_parser("flash", help="Flash firmware.bin to selected hosts sequentially.")
|
|
p_flash.add_argument("--firmware", required=True, help="Path to firmware.bin")
|
|
p_flash.add_argument("--targets", help="Comma-separated IP list")
|
|
p_flash.add_argument("--targets-file", help="Text file with one IP per line")
|
|
p_flash.add_argument("--discover", action="store_true", help="Discover targets before flashing")
|
|
p_flash.add_argument("--subnet", action="append", help="Subnet CIDR for discovery mode")
|
|
p_flash.add_argument("--start-from", help="Start flashing from this IP within the resolved target list")
|
|
p_flash.add_argument("--timeout", type=float, default=0.8, help="HTTP probe timeout in seconds")
|
|
p_flash.add_argument("--workers", type=int, default=128, help="Parallel probe workers for discovery")
|
|
p_flash.add_argument("--connect-timeout", type=float, default=5.0, help="HTTP connect timeout in seconds")
|
|
p_flash.add_argument("--upload-timeout", type=float, default=90.0, help="HTTP upload timeout in seconds")
|
|
p_flash.add_argument("--reboot-wait", type=float, default=10.0, help="Sleep after upload before online check")
|
|
p_flash.add_argument("--offline-timeout", type=float, default=20.0, help="How long to wait for the device to disappear during reboot")
|
|
p_flash.add_argument("--online-timeout", type=float, default=60.0, help="How long to wait for device to come back")
|
|
p_flash.set_defaults(func=cmd_flash)
|
|
|
|
return parser
|
|
|
|
|
|
def main() -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
try:
|
|
return int(args.func(args))
|
|
except KeyboardInterrupt:
|
|
print("\nAborted by user (Ctrl+C).")
|
|
print("Tip: rerun with --start-from <ip> to continue at a specific device.")
|
|
return 130
|
|
|
|
|
|
if __name__ == "__main__":
|
|
os.environ.setdefault("PYTHONUNBUFFERED", "1")
|
|
raise SystemExit(main())
|