632 lines
25 KiB
Python
Executable File
632 lines
25 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Discover WLED devices in local networks and flash them sequentially via OTA.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ipaddress
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
import requests
|
|
|
|
|
|
DEFAULT_OUTPUT_FILE = "tools/discovered_wled_hosts.txt"
|
|
|
|
|
|
@dataclass
|
|
class WledHost:
|
|
ip: str
|
|
name: str
|
|
version: str
|
|
release: str
|
|
arch: str
|
|
|
|
|
|
@dataclass
|
|
class WledInfo(WledHost):
|
|
uptime_s: int
|
|
raw: dict[str, Any]
|
|
|
|
|
|
@dataclass
|
|
class OtaPreflight:
|
|
info: WledInfo | None
|
|
update_status: str
|
|
update_hint: str
|
|
firmware_size: int
|
|
ota_space_hint: str
|
|
|
|
|
|
def _run(cmd: list[str]) -> str:
|
|
proc = subprocess.run(cmd, check=True, capture_output=True, text=True)
|
|
return proc.stdout
|
|
|
|
|
|
def local_networks() -> list[ipaddress.IPv4Network]:
|
|
"""
|
|
Read active IPv4 interfaces from `ip` and return private subnets.
|
|
"""
|
|
out = _run(["ip", "-o", "-4", "addr", "show", "scope", "global"])
|
|
nets: list[ipaddress.IPv4Network] = []
|
|
seen: set[str] = set()
|
|
for line in out.splitlines():
|
|
match = re.search(r"\binet\s+(\d+\.\d+\.\d+\.\d+/\d+)\b", line)
|
|
if not match:
|
|
continue
|
|
iface_cidr = match.group(1)
|
|
iface_ip = ipaddress.ip_interface(iface_cidr)
|
|
net = iface_ip.network
|
|
if not iface_ip.ip.is_private:
|
|
continue
|
|
if net.num_addresses > 2048:
|
|
# avoid accidentally scanning very large ranges by default
|
|
net = ipaddress.ip_network(f"{iface_ip.ip}/24", strict=False)
|
|
key = str(net)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
nets.append(net)
|
|
return nets
|
|
|
|
|
|
def parse_networks(raw: Iterable[str] | None) -> list[ipaddress.IPv4Network]:
|
|
if raw:
|
|
nets: list[ipaddress.IPv4Network] = []
|
|
for item in raw:
|
|
nets.append(ipaddress.ip_network(item, strict=False))
|
|
return nets
|
|
return local_networks()
|
|
|
|
|
|
def probe_wled_info(ip: str, timeout_s: float) -> WledInfo | None:
|
|
url = f"http://{ip}/json/info"
|
|
try:
|
|
resp = requests.get(url, timeout=timeout_s)
|
|
if resp.status_code != 200:
|
|
return None
|
|
data = resp.json()
|
|
except (requests.RequestException, ValueError):
|
|
return None
|
|
|
|
# WLED info endpoint typically includes "name", "ver", "release"/"rel", and "arch".
|
|
ver = str(data.get("ver", "")).strip()
|
|
release = str(data.get("release") or data.get("rel") or "").strip()
|
|
name = str(data.get("name", "")).strip()
|
|
arch = str(data.get("arch", "")).strip()
|
|
if not ver:
|
|
return None
|
|
|
|
if not name:
|
|
name = "WLED"
|
|
if not arch:
|
|
arch = "-"
|
|
|
|
uptime_s = int(data.get("uptime", 0) or 0)
|
|
return WledInfo(ip=ip, name=name, version=ver, release=release, arch=arch, uptime_s=uptime_s, raw=data)
|
|
|
|
|
|
def probe_wled(ip: str, timeout_s: float) -> WledHost | None:
|
|
info = probe_wled_info(ip, timeout_s)
|
|
if info is None:
|
|
return None
|
|
return WledHost(ip=info.ip, name=info.name, version=info.version, release=info.release, arch=info.arch)
|
|
|
|
|
|
def discover_hosts(
|
|
nets: list[ipaddress.IPv4Network],
|
|
timeout_s: float,
|
|
workers: int,
|
|
) -> list[WledHost]:
|
|
candidates: list[str] = []
|
|
for net in nets:
|
|
for host in net.hosts():
|
|
candidates.append(str(host))
|
|
|
|
found: list[WledHost] = []
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = [executor.submit(probe_wled, ip, timeout_s) for ip in candidates]
|
|
for fut in as_completed(futures):
|
|
result = fut.result()
|
|
if result is not None:
|
|
found.append(result)
|
|
|
|
found.sort(key=lambda h: tuple(int(part) for part in h.ip.split(".")))
|
|
return found
|
|
|
|
|
|
def read_targets_file(path: Path) -> list[str]:
|
|
targets: list[str] = []
|
|
for raw in path.read_text(encoding="utf-8").splitlines():
|
|
line = raw.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
targets.append(line.split()[0])
|
|
return targets
|
|
|
|
|
|
def write_discovery(path: Path, hosts: list[WledHost]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
lines = ["# ip name version release arch"]
|
|
lines += [f"{h.ip} {h.name} {h.version} {h.release or '-'} {h.arch}" for h in hosts]
|
|
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
|
|
|
|
def wait_for_online(ip: str, timeout_s: float, interval_s: float) -> bool:
|
|
deadline = time.time() + timeout_s
|
|
while time.time() < deadline:
|
|
if probe_wled(ip, timeout_s=1.2) is not None:
|
|
return True
|
|
time.sleep(interval_s)
|
|
return False
|
|
|
|
|
|
def wait_for_offline(ip: str, timeout_s: float, interval_s: float) -> bool:
|
|
deadline = time.time() + timeout_s
|
|
while time.time() < deadline:
|
|
if probe_wled(ip, timeout_s=1.2) is None:
|
|
return True
|
|
time.sleep(interval_s)
|
|
return False
|
|
|
|
|
|
def wait_for_online_info(ip: str, timeout_s: float, interval_s: float) -> WledInfo | None:
|
|
deadline = time.time() + timeout_s
|
|
while time.time() < deadline:
|
|
info = probe_wled_info(ip, timeout_s=1.2)
|
|
if info is not None:
|
|
return info
|
|
time.sleep(interval_s)
|
|
return None
|
|
|
|
|
|
def reboot_confirmed(
|
|
before: WledInfo | None,
|
|
after: WledInfo,
|
|
offline_seen: bool,
|
|
transport_reset_seen: bool,
|
|
expected_release: str | None,
|
|
) -> tuple[bool, str]:
|
|
if offline_seen:
|
|
return True, "offline transition observed"
|
|
if before is None:
|
|
release_ok, release_reason = release_matches(after.release, expected_release)
|
|
if transport_reset_seen and release_ok:
|
|
return True, f"transport reset during upload and {release_reason}"
|
|
return False, "device was not profiled before upload, and no offline transition was observed"
|
|
if after.uptime_s + 5 < before.uptime_s:
|
|
return True, f"uptime reset from {before.uptime_s}s to {after.uptime_s}s"
|
|
release_ok, release_reason = release_matches(after.release, expected_release)
|
|
if transport_reset_seen and release_ok:
|
|
return True, (
|
|
"transport reset during upload and expected release is present "
|
|
f"({before.uptime_s}s -> {after.uptime_s}s; weak proof when flashing the same release)"
|
|
)
|
|
return False, f"device stayed reachable and uptime did not reset ({before.uptime_s}s -> {after.uptime_s}s)"
|
|
|
|
|
|
def release_matches(actual: str, expected: str | None) -> tuple[bool, str]:
|
|
if not expected:
|
|
return True, "no expected release configured"
|
|
if actual == expected:
|
|
return True, f"release matches {expected}"
|
|
if not actual:
|
|
return False, f"release is not exposed; expected {expected}"
|
|
return False, f"release mismatch: expected {expected}, got {actual}"
|
|
|
|
|
|
def _iter_numeric_fields(value: Any, prefix: str = "") -> Iterable[tuple[str, int]]:
|
|
if isinstance(value, dict):
|
|
for key, nested in value.items():
|
|
nested_prefix = f"{prefix}.{key}" if prefix else str(key)
|
|
yield from _iter_numeric_fields(nested, nested_prefix)
|
|
elif isinstance(value, list):
|
|
for index, nested in enumerate(value):
|
|
yield from _iter_numeric_fields(nested, f"{prefix}[{index}]")
|
|
elif isinstance(value, (int, float)) and not isinstance(value, bool):
|
|
yield prefix, int(value)
|
|
|
|
|
|
def ota_space_hint(info: WledInfo | None, firmware_size: int) -> str:
|
|
if info is None:
|
|
return "unknown, /json/info was not reachable"
|
|
|
|
candidates: list[tuple[str, int]] = []
|
|
for key, value in _iter_numeric_fields(info.raw):
|
|
lowered = key.lower()
|
|
# Runtime memory fields such as totalheap/freeheap are not OTA slots.
|
|
# They can contain the substring "ota" (for example "totalheap"), so
|
|
# exclude them before looking for OTA-related names.
|
|
if any(token in lowered for token in ("heap", "psram", "ram")):
|
|
continue
|
|
if any(token in lowered for token in ("sketch", "ota", "update")) and value > 0:
|
|
candidates.append((key, value))
|
|
|
|
# WLED does not consistently expose ESP.getFreeSketchSpace() in /json/info.
|
|
if not candidates:
|
|
return "not exposed by this firmware; if OTA still fails, USB-clean-flash may be required"
|
|
|
|
best_key, best_value = max(candidates, key=lambda item: item[1])
|
|
if best_value < firmware_size:
|
|
return f"WARNING: {best_key}={best_value} bytes is smaller than firmware={firmware_size} bytes"
|
|
return f"{best_key}={best_value} bytes, firmware={firmware_size} bytes"
|
|
|
|
|
|
def probe_update_page(ip: str, timeout_s: float) -> tuple[str, str]:
|
|
url = f"http://{ip}/update"
|
|
try:
|
|
resp = requests.get(url, timeout=timeout_s)
|
|
except requests.RequestException as exc:
|
|
return "unreachable", f"GET /update failed: {exc}"
|
|
|
|
text = (resp.text or "").lower()
|
|
if resp.status_code in (401, 403):
|
|
return "blocked", f"HTTP {resp.status_code}; OTA may be locked or authentication may be required"
|
|
if resp.status_code >= 400:
|
|
return "warning", f"HTTP {resp.status_code}"
|
|
if any(token in text for token in ("ota lock", "ota locked", "locked", "forbidden", "incorrect pin")):
|
|
return "blocked", "update page suggests OTA is locked or PIN/auth is required"
|
|
if any(token in text for token in ("update", "upload", "firmware")):
|
|
return "ok", "update page reachable"
|
|
return "warning", "update page reachable, but expected upload form text was not detected"
|
|
|
|
|
|
def preflight(ip: str, firmware: Path, timeout_s: float) -> OtaPreflight:
|
|
firmware_size = firmware.stat().st_size
|
|
info = probe_wled_info(ip, timeout_s=timeout_s)
|
|
update_status, update_hint = probe_update_page(ip, timeout_s=timeout_s)
|
|
return OtaPreflight(
|
|
info=info,
|
|
update_status=update_status,
|
|
update_hint=update_hint,
|
|
firmware_size=firmware_size,
|
|
ota_space_hint=ota_space_hint(info, firmware_size),
|
|
)
|
|
|
|
|
|
def print_preflight(index: int, total: int, ip: str, result: OtaPreflight) -> None:
|
|
prefix = f"[{index}/{total}] {ip}"
|
|
if result.info is None:
|
|
print(f"{prefix}: /json/info unavailable")
|
|
else:
|
|
print(
|
|
f"{prefix}: current firmware {result.info.version}, release '{result.info.release or '-'}', uptime {result.info.uptime_s}s, "
|
|
f"name '{result.info.name}', arch '{result.info.arch or '-'}'"
|
|
)
|
|
print(f"{prefix}: firmware size {result.firmware_size} bytes")
|
|
print(f"{prefix}: OTA space hint: {result.ota_space_hint}")
|
|
print(f"{prefix}: /update preflight: {result.update_status} ({result.update_hint})")
|
|
|
|
|
|
def request_reboot(ip: str, timeout_s: float) -> tuple[bool, str]:
|
|
url = f"http://{ip}/json/state"
|
|
try:
|
|
resp = requests.post(url, json={"rb": True}, timeout=timeout_s)
|
|
except requests.RequestException as exc:
|
|
return False, f"reboot request failed: {exc}"
|
|
if resp.status_code >= 400:
|
|
return False, f"reboot request returned HTTP {resp.status_code}"
|
|
return True, "reboot requested via /json/state"
|
|
|
|
|
|
def classify_update_response(text: str) -> tuple[str, str]:
|
|
normalized = " ".join((text or "").strip().split())
|
|
lowered = normalized.lower()
|
|
snippet = normalized[:180]
|
|
if "update successful" in lowered or "rebooting" in lowered:
|
|
return "ok", "ok"
|
|
if "update failed" in lowered or "could not activate the firmware" in lowered:
|
|
return "failed", f"device reported update failure: {snippet or 'empty response'}"
|
|
# WLED message pages contain generic scripts and wording; unknown 200 OK
|
|
# responses are verified by the reboot/release checks instead of string
|
|
# guessing here.
|
|
return "ok", "ok"
|
|
|
|
|
|
def ota_flash(
|
|
ip: str,
|
|
firmware: Path,
|
|
connect_timeout_s: float,
|
|
read_timeout_s: float,
|
|
skip_validation: bool,
|
|
backend: str,
|
|
) -> tuple[str, str]:
|
|
# Send skipValidation both as query and multipart field. Different WLED-MM
|
|
# builds have used different request parameter paths around OTA validation.
|
|
url = f"http://{ip}/update"
|
|
if skip_validation:
|
|
url += "?skipValidation=1"
|
|
|
|
if backend == "curl":
|
|
cmd = [
|
|
"curl",
|
|
"-sS",
|
|
"--connect-timeout",
|
|
str(max(1, int(connect_timeout_s))),
|
|
"--max-time",
|
|
str(max(1, int(read_timeout_s))),
|
|
"-F",
|
|
f"update=@{firmware}",
|
|
]
|
|
if skip_validation:
|
|
cmd += ["-F", "skipValidation=1"]
|
|
cmd.append(url)
|
|
try:
|
|
proc = subprocess.run(cmd, check=False, capture_output=True, text=True)
|
|
except OSError as exc:
|
|
return "failed", f"curl unavailable or failed to start: {exc}"
|
|
combined = " ".join((proc.stdout + " " + proc.stderr).strip().split())
|
|
snippet = combined[:180]
|
|
if proc.returncode == 0:
|
|
return classify_update_response(combined)
|
|
# WLED often closes the socket or reboots before curl receives a clean
|
|
# response. Treat transport drops as uncertain and prove success later.
|
|
if proc.returncode in (52, 55, 56):
|
|
return "transport_reset", f"curl exit {proc.returncode}: {snippet or 'connection dropped during/after upload'}"
|
|
if proc.returncode == 28:
|
|
return "uncertain", f"curl exit {proc.returncode}: {snippet or 'upload timed out'}"
|
|
return "failed", f"curl exit {proc.returncode}: {snippet or 'empty response'}"
|
|
|
|
try:
|
|
with firmware.open("rb") as fh:
|
|
resp = requests.post(
|
|
url,
|
|
data={"skipValidation": "1"} if skip_validation else None,
|
|
files={"update": (firmware.name, fh, "application/octet-stream")},
|
|
timeout=(connect_timeout_s, read_timeout_s),
|
|
)
|
|
except requests.ReadTimeout:
|
|
# Common with WLED OTA: upload is accepted but HTTP response never arrives before reboot.
|
|
return "uncertain", "read timeout after upload"
|
|
except requests.ConnectionError:
|
|
# Some devices close the socket abruptly when rebooting after successful OTA.
|
|
return "transport_reset", "connection dropped during/after upload"
|
|
except requests.RequestException as exc:
|
|
return "failed", f"request failed: {exc}"
|
|
|
|
text = resp.text or ""
|
|
snippet = " ".join(text.strip().split())[:180]
|
|
if resp.status_code >= 400:
|
|
return "failed", f"http {resp.status_code}: {snippet or 'empty response'}"
|
|
return classify_update_response(text)
|
|
|
|
|
|
def print_hosts(hosts: list[WledHost]) -> None:
|
|
if not hosts:
|
|
print("No WLED devices found.")
|
|
return
|
|
print(f"Found {len(hosts)} WLED device(s):")
|
|
print(f"{'IP':<16} {'Name':<24} {'Version':<18} {'Release':<30} {'Arch'}")
|
|
print("-" * 112)
|
|
for h in hosts:
|
|
release = h.release or "-"
|
|
print(f"{h.ip:<16} {h.name[:24]:<24} {h.version[:18]:<18} {release[:30]:<30} {h.arch}")
|
|
|
|
|
|
def cmd_discover(args: argparse.Namespace) -> int:
|
|
nets = parse_networks(args.subnet)
|
|
if not nets:
|
|
print("No private IPv4 networks found. Pass --subnet explicitly.")
|
|
return 2
|
|
|
|
print("Scanning networks:", ", ".join(str(n) for n in nets))
|
|
hosts = discover_hosts(nets=nets, timeout_s=args.timeout, workers=args.workers)
|
|
print_hosts(hosts)
|
|
|
|
if args.output:
|
|
out = Path(args.output)
|
|
write_discovery(out, hosts)
|
|
print(f"Saved discovery list: {out}")
|
|
return 0
|
|
|
|
|
|
def resolve_targets(args: argparse.Namespace) -> list[str]:
|
|
targets: list[str] = []
|
|
|
|
if args.targets:
|
|
targets.extend([t.strip() for t in args.targets.split(",") if t.strip()])
|
|
|
|
if args.targets_file:
|
|
targets.extend(read_targets_file(Path(args.targets_file)))
|
|
|
|
if args.discover:
|
|
nets = parse_networks(args.subnet)
|
|
hosts = discover_hosts(nets=nets, timeout_s=args.timeout, workers=args.workers)
|
|
targets.extend([h.ip for h in hosts])
|
|
|
|
unique: list[str] = []
|
|
seen: set[str] = set()
|
|
for t in targets:
|
|
if t in seen:
|
|
continue
|
|
seen.add(t)
|
|
unique.append(t)
|
|
return unique
|
|
|
|
|
|
def cmd_flash(args: argparse.Namespace) -> int:
|
|
firmware = Path(args.firmware)
|
|
if not firmware.exists():
|
|
print(f"Firmware file not found: {firmware}")
|
|
return 2
|
|
|
|
targets = resolve_targets(args)
|
|
if not targets:
|
|
print("No targets selected. Use --targets, --targets-file, or --discover.")
|
|
return 2
|
|
if args.start_from:
|
|
if args.start_from not in targets:
|
|
print(f"--start-from target not found in target set: {args.start_from}")
|
|
return 2
|
|
start_idx = targets.index(args.start_from)
|
|
targets = targets[start_idx:]
|
|
|
|
print(f"Flashing {len(targets)} device(s) sequentially with: {firmware}")
|
|
failures: list[str] = []
|
|
|
|
for idx, ip in enumerate(targets, start=1):
|
|
check = preflight(ip=ip, firmware=firmware, timeout_s=args.timeout)
|
|
print_preflight(idx, len(targets), ip, check)
|
|
before = check.info
|
|
if args.preflight_only:
|
|
if before is not None and args.expect_release:
|
|
release_ok, release_reason = release_matches(before.release, args.expect_release)
|
|
level = "ok" if release_ok else "warning"
|
|
print(f"[{idx}/{len(targets)}] {ip}: current release check: {level} ({release_reason})")
|
|
if before is None or check.update_status in ("blocked", "unreachable"):
|
|
print(f"[{idx}/{len(targets)}] {ip}: FAILED (preflight did not prove OTA readiness)")
|
|
failures.append(ip)
|
|
continue
|
|
if check.update_status == "blocked" and not args.ignore_preflight_warnings:
|
|
print(f"[{idx}/{len(targets)}] {ip}: FAILED (OTA preflight is blocked; use --ignore-preflight-warnings to try anyway)")
|
|
failures.append(ip)
|
|
continue
|
|
if args.skip_if_release_matches and before is not None and args.expect_release:
|
|
release_ok, release_reason = release_matches(before.release, args.expect_release)
|
|
if release_ok:
|
|
print(f"[{idx}/{len(targets)}] {ip}: SKIP ({release_reason}; use --force-same-release to reflash anyway)")
|
|
continue
|
|
|
|
print(f"[{idx}/{len(targets)}] {ip}: uploading...")
|
|
status, msg = ota_flash(
|
|
ip=ip,
|
|
firmware=firmware,
|
|
connect_timeout_s=args.connect_timeout,
|
|
read_timeout_s=args.upload_timeout,
|
|
skip_validation=args.skip_validation,
|
|
backend=args.upload_backend,
|
|
)
|
|
if status == "failed":
|
|
print(f"[{idx}/{len(targets)}] {ip}: FAILED ({msg})")
|
|
failures.append(ip)
|
|
continue
|
|
|
|
transport_reset_seen = status == "transport_reset"
|
|
if status in ("uncertain", "transport_reset"):
|
|
print(f"[{idx}/{len(targets)}] {ip}: upload response uncertain ({msg}), verifying via reboot check...")
|
|
else:
|
|
print(f"[{idx}/{len(targets)}] {ip}: uploaded, waiting {args.reboot_wait:.1f}s for reboot...")
|
|
time.sleep(args.reboot_wait)
|
|
|
|
forced_reboot_used = False
|
|
offline_seen = wait_for_offline(ip=ip, timeout_s=args.offline_timeout, interval_s=0.5)
|
|
if offline_seen:
|
|
print(f"[{idx}/{len(targets)}] {ip}: reboot detected, device went offline.")
|
|
else:
|
|
print(f"[{idx}/{len(targets)}] {ip}: warning, no offline transition observed. Checking uptime reset...")
|
|
if args.force_reboot_after_upload:
|
|
reboot_sent, reboot_msg = request_reboot(ip=ip, timeout_s=args.timeout)
|
|
print(f"[{idx}/{len(targets)}] {ip}: forced reboot fallback: {reboot_msg}")
|
|
if reboot_sent:
|
|
forced_reboot_used = True
|
|
time.sleep(args.reboot_wait)
|
|
offline_seen = wait_for_offline(ip=ip, timeout_s=args.offline_timeout, interval_s=0.5)
|
|
if offline_seen:
|
|
print(f"[{idx}/{len(targets)}] {ip}: forced reboot detected, device went offline.")
|
|
|
|
after = wait_for_online_info(ip=ip, timeout_s=args.online_timeout, interval_s=1.0)
|
|
if after is None:
|
|
print(f"[{idx}/{len(targets)}] {ip}: no online response after upload window.")
|
|
failures.append(ip)
|
|
continue
|
|
|
|
reboot_ok, reason = reboot_confirmed(
|
|
before=before,
|
|
after=after,
|
|
offline_seen=offline_seen,
|
|
transport_reset_seen=transport_reset_seen,
|
|
expected_release=args.expect_release,
|
|
)
|
|
if not reboot_ok:
|
|
print(f"[{idx}/{len(targets)}] {ip}: FAILED (could not prove reboot: {reason})")
|
|
failures.append(ip)
|
|
continue
|
|
|
|
release_ok, release_reason = release_matches(after.release, args.expect_release)
|
|
if not release_ok:
|
|
print(f"[{idx}/{len(targets)}] {ip}: FAILED ({release_reason})")
|
|
failures.append(ip)
|
|
continue
|
|
|
|
print(
|
|
f"[{idx}/{len(targets)}] {ip}: OK "
|
|
f"(now {after.version}, release '{after.release or '-'}', uptime {after.uptime_s}s, {reason}, {release_reason})"
|
|
)
|
|
if forced_reboot_used:
|
|
print(f"[{idx}/{len(targets)}] {ip}: warning, reboot was forced after an uncertain upload; verify the firmware manually in /json/info")
|
|
|
|
if failures:
|
|
print("\nFailed targets:")
|
|
for ip in failures:
|
|
print(f"- {ip}")
|
|
return 1
|
|
|
|
if args.preflight_only:
|
|
print("\nAll targets passed preflight.")
|
|
else:
|
|
print("\nAll targets flashed successfully.")
|
|
return 0
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Discover and OTA-flash WLED devices.")
|
|
sub = parser.add_subparsers(dest="cmd", required=True)
|
|
|
|
p_discover = sub.add_parser("discover", help="Scan network(s) for WLED devices.")
|
|
p_discover.add_argument("--subnet", action="append", help="Subnet CIDR, repeatable (example: 192.168.1.0/24).")
|
|
p_discover.add_argument("--timeout", type=float, default=0.8, help="HTTP probe timeout in seconds.")
|
|
p_discover.add_argument("--workers", type=int, default=128, help="Parallel probe workers.")
|
|
p_discover.add_argument("--output", default=DEFAULT_OUTPUT_FILE, help="Output file for discovered hosts.")
|
|
p_discover.set_defaults(func=cmd_discover)
|
|
|
|
p_flash = sub.add_parser("flash", help="Flash firmware.bin to selected hosts sequentially.")
|
|
p_flash.add_argument("--firmware", required=True, help="Path to firmware.bin")
|
|
p_flash.add_argument("--targets", help="Comma-separated IP list")
|
|
p_flash.add_argument("--targets-file", help="Text file with one IP per line")
|
|
p_flash.add_argument("--discover", action="store_true", help="Discover targets before flashing")
|
|
p_flash.add_argument("--subnet", action="append", help="Subnet CIDR for discovery mode")
|
|
p_flash.add_argument("--start-from", help="Start flashing from this IP within the resolved target list")
|
|
p_flash.add_argument("--timeout", type=float, default=0.8, help="HTTP probe timeout in seconds")
|
|
p_flash.add_argument("--workers", type=int, default=128, help="Parallel probe workers for discovery")
|
|
p_flash.add_argument("--connect-timeout", type=float, default=5.0, help="HTTP connect timeout in seconds")
|
|
p_flash.add_argument("--upload-timeout", type=float, default=90.0, help="HTTP upload timeout in seconds")
|
|
p_flash.add_argument("--reboot-wait", type=float, default=10.0, help="Sleep after upload before online check")
|
|
p_flash.add_argument("--offline-timeout", type=float, default=20.0, help="How long to wait for the device to disappear during reboot")
|
|
p_flash.add_argument("--online-timeout", type=float, default=60.0, help="How long to wait for device to come back")
|
|
p_flash.add_argument("--preflight-only", action="store_true", help="Only print /json/info and /update diagnostics, do not upload")
|
|
p_flash.add_argument("--ignore-preflight-warnings", action="store_true", help="Try upload even if /update preflight looks blocked")
|
|
p_flash.add_argument("--force-reboot-after-upload", action="store_true", help="After uncertain upload with no offline transition, request reboot via /json/state and verify uptime")
|
|
p_flash.add_argument("--expect-release", help="Require /json/info release/rel to match this value after OTA")
|
|
p_flash.add_argument("--skip-if-release-matches", action="store_true", help="Skip a target when its current release already matches --expect-release")
|
|
p_flash.add_argument("--force-same-release", dest="skip_if_release_matches", action="store_false", help="Reflash even when the current release already matches --expect-release")
|
|
p_flash.add_argument("--skip-validation", action="store_true", help="Send WLED skipValidation=1 for controlled migrations between release names")
|
|
p_flash.add_argument("--upload-backend", choices=("curl", "requests"), default="curl", help="HTTP upload implementation (default: curl, matching WLED helper scripts)")
|
|
p_flash.set_defaults(skip_if_release_matches=False)
|
|
p_flash.set_defaults(func=cmd_flash)
|
|
|
|
return parser
|
|
|
|
|
|
def main() -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
try:
|
|
return int(args.func(args))
|
|
except KeyboardInterrupt:
|
|
print("\nAborted by user (Ctrl+C).")
|
|
print("Tip: rerun with --start-from <ip> to continue at a specific device.")
|
|
return 130
|
|
|
|
|
|
if __name__ == "__main__":
|
|
os.environ.setdefault("PYTHONUNBUFFERED", "1")
|
|
raise SystemExit(main())
|