#!/usr/bin/env python3 """Build and OTA-update the full RFP Infinity installation. This script intentionally delegates flashing to tools/rfp_network_flash.py so the actual OTA path stays the existing WLED /update workflow. """ from __future__ import annotations import argparse import ipaddress import os import subprocess import sys from pathlib import Path NODE_ENV = "rfp_esp32s3_wroom1_n16r8_3x106" MASTER_ENV = "rfp_esp32s3_wroom1_n16r8_master" NODE_FIRMWARE = Path(".pio/build") / NODE_ENV / "firmware.bin" MASTER_FIRMWARE = Path(".pio/build") / MASTER_ENV / "firmware.bin" DEFAULT_SUBNET = "192.168.178.0/24" NODE_HOSTS = range(11, 17) MASTER_HOST = 10 def repo_root() -> Path: return Path(__file__).resolve().parents[1] def python_executable(root: Path) -> str: local_python = root / ".venv/bin/python" return str(local_python) if local_python.exists() else sys.executable def platformio_env(root: Path) -> dict[str, str]: env = os.environ.copy() node_bin = "/home/jan/Documents/RFP/Finanz_App/node/current/bin" env["PATH"] = f"{node_bin}:{env.get('PATH', '')}" env["NPM_CONFIG_CACHE"] = str(root / ".npm-cache") env["PLATFORMIO_CORE_DIR"] = str(root / ".piohome") env["PLATFORMIO_PACKAGES_DIR"] = str(root / ".piohome/packages") env["PLATFORMIO_PLATFORMS_DIR"] = str(root / ".piohome/platforms") env["PLATFORMIO_CACHE_DIR"] = str(root / ".piohome/.cache") env["PLATFORMIO_BUILD_CACHE_DIR"] = str(root / ".piohome/buildcache") return env def targets_from_subnet(subnet: str) -> tuple[list[str], str]: network = ipaddress.ip_network(subnet, strict=False) if network.version != 4: raise ValueError("Only IPv4 subnets are supported.") octets = str(network.network_address).split(".") if len(octets) != 4: raise ValueError(f"Invalid IPv4 subnet: {subnet}") prefix = ".".join(octets[:3]) return [f"{prefix}.{host}" for host in NODE_HOSTS], f"{prefix}.{MASTER_HOST}" def run(cmd: list[str], root: Path, env: dict[str, str], dry_run: bool) -> None: print("+ " + " ".join(cmd)) if dry_run: return subprocess.run(cmd, cwd=root, env=env, check=True) def build(root: Path, env_name: str, env: dict[str, str], dry_run: bool) -> None: run([python_executable(root), "-m", "platformio", "run", "-e", env_name], root, env, dry_run) def flash(root: Path, targets: list[str], firmware: Path, env: dict[str, str], dry_run: bool) -> None: if not targets: return firmware_path = root / firmware if not dry_run and not firmware_path.exists(): raise FileNotFoundError(f"Firmware does not exist: {firmware_path}") run( [ python_executable(root), "tools/rfp_network_flash.py", "flash", "--targets", ",".join(targets), "--firmware", str(firmware), ], root, env, dry_run, ) def filtered_plan(nodes: list[str], master: str, args: argparse.Namespace) -> list[tuple[str, str]]: plan: list[tuple[str, str]] = [] if not args.master_only: plan.extend((ip, "node") for ip in nodes) if not args.nodes_only: plan.append((master, "master")) if not args.start_from: return plan for index, (ip, _role) in enumerate(plan): if ip == args.start_from: return plan[index:] raise ValueError(f"--start-from {args.start_from} is not in the selected update plan.") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Build and OTA-update RFP Infinity nodes, then master.") role = parser.add_mutually_exclusive_group() role.add_argument("--nodes-only", action="store_true", help="Only build/flash nodes .11-.16") role.add_argument("--master-only", action="store_true", help="Only build/flash master .10") parser.add_argument("--start-from", help="Resume at this IP, for example 192.168.178.14 or 192.168.178.10") parser.add_argument("--subnet", default=DEFAULT_SUBNET, help=f"Show subnet used to derive .10-.16 targets (default: {DEFAULT_SUBNET})") parser.add_argument("--no-build", action="store_true", help="Skip PlatformIO builds and flash existing firmware.bin files") parser.add_argument("--dry-run", action="store_true", help="Print build and flash steps without executing them") return parser.parse_args() def main() -> int: args = parse_args() root = repo_root() env = platformio_env(root) nodes, master = targets_from_subnet(args.subnet) plan = filtered_plan(nodes, master, args) node_targets = [ip for ip, role in plan if role == "node"] master_targets = [ip for ip, role in plan if role == "master"] if not plan: print("Nothing selected.") return 0 print("RFP Infinity OTA update plan:") for ip, role in plan: print(f"- {ip} ({role})") print() if not args.no_build: if node_targets: print(f"Building node firmware: {NODE_ENV}") build(root, NODE_ENV, env, args.dry_run) if master_targets: print(f"Building master firmware: {MASTER_ENV}") build(root, MASTER_ENV, env, args.dry_run) if node_targets: print("Flashing nodes sequentially...") flash(root, node_targets, NODE_FIRMWARE, env, args.dry_run) if master_targets: print("Flashing master last...") flash(root, master_targets, MASTER_FIRMWARE, env, args.dry_run) print("OTA update plan completed.") return 0 if __name__ == "__main__": raise SystemExit(main())