from __future__ import annotations import socket import struct import time from app.config.models import InfinityMirrorConfig, TileConfig from app.core.types import PreviewFrame, TileFrame from app.network.wled import fetch_wled_info from .base import ControllerMetrics, OutputBackend, OutputResult DDP_DEFAULT_PORT = 4048 DDP_HEADER_LENGTH = 10 DDP_MAX_DATA_LENGTH = 1440 DDP_VERSION_1 = 0x40 DDP_PUSH_FLAG = 0x01 DDP_RGB888 = 0x0B DDP_DEFAULT_DESTINATION_ID = 1 WLED_INFO_TIMEOUT_S = 0.25 WLED_INFO_SOURCE = "WLED /json/info leds.fps (live only)" DDP_UNCHANGED_HOST_KEEPALIVE_S = 0.35 class DdpOutputBackend(OutputBackend): backend_id = "ddp" display_name = "DDP (WLED)" supports_live_output = True def __init__(self, port: int = DDP_DEFAULT_PORT, destination_id: int = DDP_DEFAULT_DESTINATION_ID) -> None: self.port = int(port) self.destination_id = int(destination_id) self._socket: socket.socket | None = None self._sequence = 0 self._last_payload_by_host: dict[str, bytes] = {} self._last_payload_sent_at: dict[str, float] = {} def start(self) -> None: if self._socket is None: self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) def stop(self) -> None: if self._socket is not None: self._socket.close() self._socket = None self._last_payload_by_host.clear() self._last_payload_sent_at.clear() def controller_metrics(self, config: InfinityMirrorConfig) -> ControllerMetrics | None: hosts = self._controller_hosts(config) if not hosts: return ControllerMetrics(source=WLED_INFO_SOURCE) fps_values: list[float] = [] sampled_devices = 0 live_devices = 0 for host in hosts: info = self._fetch_wled_info(host) if not isinstance(info, dict): continue sampled_devices += 1 live = bool(info.get("live")) leds = info.get("leds") fps = leds.get("fps") if isinstance(leds, dict) else None if not live: continue live_devices += 1 if isinstance(fps, (int, float)): fps_values.append(float(fps)) elif isinstance(fps, str): try: fps_values.append(float(fps)) except ValueError: pass average_fps = (sum(fps_values) / len(fps_values)) if fps_values else None return ControllerMetrics( fps=average_fps, live_devices=live_devices, sampled_devices=sampled_devices, total_devices=len(hosts), source=WLED_INFO_SOURCE, ) def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult: try: self.start() packets = self._build_packets(config, frame) for host, sequence, offset, payload, last in packets: packet = self._create_ddp_packet(sequence=sequence, offset=offset, payload=payload, last=last) self._socket.sendto(packet, (host, self.port)) device_count = len({host for host, _, _, _, _ in packets}) return OutputResult( ok=True, message=f"Sent {len(packets)} DDP packet(s).", packets_sent=len(packets), device_count=device_count, ) except OSError as exc: return OutputResult(ok=False, message=f"DDP send failed: {exc}") def _build_packets( self, config: InfinityMirrorConfig, frame: PreviewFrame, ) -> list[tuple[str, int, int, bytes, bool]]: packets: list[tuple[str, int, int, bytes, bool]] = [] sequence = self._next_sequence() payloads = self._filter_redundant_payloads(self._build_device_payloads(config, frame), time.perf_counter()) for host, payload in payloads.items(): for offset in range(0, len(payload), DDP_MAX_DATA_LENGTH): chunk = payload[offset : offset + DDP_MAX_DATA_LENGTH] last = offset + DDP_MAX_DATA_LENGTH >= len(payload) packets.append((host, sequence, offset, chunk, last)) return packets def _build_device_payloads(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> dict[str, bytes]: host_payloads: dict[str, bytearray] = {} for tile in config.sorted_tiles(): tile_frame = frame.tiles.get(tile.tile_id) if not tile.enabled or tile_frame is None: continue payload = self._tile_payload(tile, tile_frame) if not payload: continue host = tile.controller_ip or "127.0.0.1" host_payloads.setdefault(host, bytearray()).extend(payload) return {host: bytes(payload) for host, payload in host_payloads.items()} def _tile_payload(self, tile: TileConfig, tile_frame: TileFrame) -> bytes: required_length = tile.led_total * 3 for segment in tile.segments: segment_colors = tile_frame.led_pixels.get(segment.name, []) segment_end = max(0, segment.start_channel - 1) + (len(segment_colors) * 3) required_length = max(required_length, segment_end) if required_length <= 0: return b"" payload = bytearray(required_length) for segment in sorted(tile.segments, key=lambda item: (item.start_channel, item.name)): segment_colors = tile_frame.led_pixels.get(segment.name, []) base_index = max(0, segment.start_channel - 1) for led_index, color in enumerate(segment_colors): channel_index = base_index + led_index * 3 if channel_index + 2 >= len(payload): break red, green, blue = color.to_8bit_tuple() payload[channel_index] = red payload[channel_index + 1] = green payload[channel_index + 2] = blue return bytes(payload) def _create_ddp_packet(self, sequence: int, offset: int, payload: bytes, last: bool) -> bytes: header = struct.pack( "!BBBBLH", DDP_VERSION_1 | (DDP_PUSH_FLAG if last else 0), sequence, DDP_RGB888, self.destination_id, offset, len(payload), ) return header + payload def _next_sequence(self) -> int: self._sequence = (self._sequence % 15) + 1 return self._sequence def _controller_hosts(self, config: InfinityMirrorConfig) -> list[str]: return sorted({tile.controller_ip.strip() for tile in config.sorted_tiles() if tile.enabled and tile.controller_ip.strip()}) def _fetch_wled_info(self, host: str) -> dict[str, object] | None: result = fetch_wled_info(host, timeout_s=WLED_INFO_TIMEOUT_S) if result is None: return None payload, _endpoint = result return payload def _filter_redundant_payloads(self, host_payloads: dict[str, bytes], now: float) -> dict[str, bytes]: filtered: dict[str, bytes] = {} active_hosts = set(host_payloads) for stale_host in set(self._last_payload_by_host) - active_hosts: self._last_payload_by_host.pop(stale_host, None) self._last_payload_sent_at.pop(stale_host, None) for host, payload in host_payloads.items(): previous_payload = self._last_payload_by_host.get(host) last_sent_at = self._last_payload_sent_at.get(host, 0.0) unchanged = previous_payload == payload keepalive_due = (now - last_sent_at) >= DDP_UNCHANGED_HOST_KEEPALIVE_S if unchanged and not keepalive_due: continue filtered[host] = payload self._last_payload_by_host[host] = payload self._last_payload_sent_at[host] = now return filtered