Files

206 lines
7.9 KiB
Python

from __future__ import annotations
import socket
import struct
import time
from app.config.models import InfinityMirrorConfig, TileConfig
from app.core.types import PreviewFrame, TileFrame
from app.network.wled import fetch_wled_info
from .base import ControllerMetrics, OutputBackend, OutputResult
DDP_DEFAULT_PORT = 4048
DDP_HEADER_LENGTH = 10
DDP_MAX_DATA_LENGTH = 1440
DDP_VERSION_1 = 0x40
DDP_PUSH_FLAG = 0x01
DDP_RGB888 = 0x0B
DDP_DEFAULT_DESTINATION_ID = 1
WLED_INFO_TIMEOUT_S = 0.25
WLED_INFO_SOURCE = "WLED /json/info leds.fps (live only)"
DDP_UNCHANGED_HOST_KEEPALIVE_S = 0.35
class DdpOutputBackend(OutputBackend):
backend_id = "ddp"
display_name = "DDP (WLED)"
supports_live_output = True
def __init__(self, port: int = DDP_DEFAULT_PORT, destination_id: int = DDP_DEFAULT_DESTINATION_ID) -> None:
self.port = int(port)
self.destination_id = int(destination_id)
self._socket: socket.socket | None = None
self._sequence = 0
self._last_payload_by_host: dict[str, bytes] = {}
self._last_payload_sent_at: dict[str, float] = {}
def start(self) -> None:
if self._socket is None:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def stop(self) -> None:
if self._socket is not None:
self._socket.close()
self._socket = None
self._last_payload_by_host.clear()
self._last_payload_sent_at.clear()
def controller_metrics(self, config: InfinityMirrorConfig) -> ControllerMetrics | None:
hosts = self._controller_hosts(config)
if not hosts:
return ControllerMetrics(source=WLED_INFO_SOURCE)
fps_values: list[float] = []
sampled_devices = 0
live_devices = 0
for host in hosts:
info = self._fetch_wled_info(host)
if not isinstance(info, dict):
continue
sampled_devices += 1
live = bool(info.get("live"))
leds = info.get("leds")
fps = leds.get("fps") if isinstance(leds, dict) else None
if not live:
continue
live_devices += 1
if isinstance(fps, (int, float)):
fps_values.append(float(fps))
elif isinstance(fps, str):
try:
fps_values.append(float(fps))
except ValueError:
pass
average_fps = (sum(fps_values) / len(fps_values)) if fps_values else None
return ControllerMetrics(
fps=average_fps,
live_devices=live_devices,
sampled_devices=sampled_devices,
total_devices=len(hosts),
source=WLED_INFO_SOURCE,
)
def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult:
try:
self.start()
packets = self._build_packets(config, frame)
for host, sequence, offset, payload, last in packets:
packet = self._create_ddp_packet(sequence=sequence, offset=offset, payload=payload, last=last)
self._socket.sendto(packet, (host, self.port))
device_count = len({host for host, _, _, _, _ in packets})
return OutputResult(
ok=True,
message=f"Sent {len(packets)} DDP packet(s).",
packets_sent=len(packets),
device_count=device_count,
)
except OSError as exc:
return OutputResult(ok=False, message=f"DDP send failed: {exc}")
def _build_packets(
self,
config: InfinityMirrorConfig,
frame: PreviewFrame,
) -> list[tuple[str, int, int, bytes, bool]]:
packets: list[tuple[str, int, int, bytes, bool]] = []
sequence = self._next_sequence()
payloads = self._filter_redundant_payloads(self._build_device_payloads(config, frame), time.perf_counter())
for host, payload in payloads.items():
for offset in range(0, len(payload), DDP_MAX_DATA_LENGTH):
chunk = payload[offset : offset + DDP_MAX_DATA_LENGTH]
last = offset + DDP_MAX_DATA_LENGTH >= len(payload)
packets.append((host, sequence, offset, chunk, last))
return packets
def _build_device_payloads(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> dict[str, bytes]:
host_payloads: dict[str, bytearray] = {}
for tile in config.sorted_tiles():
tile_frame = frame.tiles.get(tile.tile_id)
if not tile.enabled or tile_frame is None:
continue
payload = self._tile_payload(tile, tile_frame)
if not payload:
continue
host = tile.controller_ip or "127.0.0.1"
host_payloads.setdefault(host, bytearray()).extend(payload)
return {host: bytes(payload) for host, payload in host_payloads.items()}
def _tile_payload(self, tile: TileConfig, tile_frame: TileFrame) -> bytes:
required_length = tile.led_total * 3
for segment in tile.segments:
segment_colors = tile_frame.led_pixels.get(segment.name, [])
segment_end = max(0, segment.start_channel - 1) + (len(segment_colors) * 3)
required_length = max(required_length, segment_end)
if required_length <= 0:
return b""
payload = bytearray(required_length)
for segment in sorted(tile.segments, key=lambda item: (item.start_channel, item.name)):
segment_colors = tile_frame.led_pixels.get(segment.name, [])
base_index = max(0, segment.start_channel - 1)
for led_index, color in enumerate(segment_colors):
channel_index = base_index + led_index * 3
if channel_index + 2 >= len(payload):
break
red, green, blue = color.to_8bit_tuple()
payload[channel_index] = red
payload[channel_index + 1] = green
payload[channel_index + 2] = blue
return bytes(payload)
def _create_ddp_packet(self, sequence: int, offset: int, payload: bytes, last: bool) -> bytes:
header = struct.pack(
"!BBBBLH",
DDP_VERSION_1 | (DDP_PUSH_FLAG if last else 0),
sequence,
DDP_RGB888,
self.destination_id,
offset,
len(payload),
)
return header + payload
def _next_sequence(self) -> int:
self._sequence = (self._sequence % 15) + 1
return self._sequence
def _controller_hosts(self, config: InfinityMirrorConfig) -> list[str]:
return sorted({tile.controller_ip.strip() for tile in config.sorted_tiles() if tile.enabled and tile.controller_ip.strip()})
def _fetch_wled_info(self, host: str) -> dict[str, object] | None:
result = fetch_wled_info(host, timeout_s=WLED_INFO_TIMEOUT_S)
if result is None:
return None
payload, _endpoint = result
return payload
def _filter_redundant_payloads(self, host_payloads: dict[str, bytes], now: float) -> dict[str, bytes]:
filtered: dict[str, bytes] = {}
active_hosts = set(host_payloads)
for stale_host in set(self._last_payload_by_host) - active_hosts:
self._last_payload_by_host.pop(stale_host, None)
self._last_payload_sent_at.pop(stale_host, None)
for host, payload in host_payloads.items():
previous_payload = self._last_payload_by_host.get(host)
last_sent_at = self._last_payload_sent_at.get(host, 0.0)
unchanged = previous_payload == payload
keepalive_due = (now - last_sent_at) >= DDP_UNCHANGED_HOST_KEEPALIVE_S
if unchanged and not keepalive_due:
continue
filtered[host] = payload
self._last_payload_by_host[host] = payload
self._last_payload_sent_at[host] = now
return filtered