206 lines
7.9 KiB
Python
206 lines
7.9 KiB
Python
from __future__ import annotations
|
|
|
|
import socket
|
|
import struct
|
|
import time
|
|
|
|
from app.config.models import InfinityMirrorConfig, TileConfig
|
|
from app.core.types import PreviewFrame, TileFrame
|
|
from app.network.wled import fetch_wled_info
|
|
|
|
from .base import ControllerMetrics, OutputBackend, OutputResult
|
|
|
|
DDP_DEFAULT_PORT = 4048
|
|
DDP_HEADER_LENGTH = 10
|
|
DDP_MAX_DATA_LENGTH = 1440
|
|
DDP_VERSION_1 = 0x40
|
|
DDP_PUSH_FLAG = 0x01
|
|
DDP_RGB888 = 0x0B
|
|
DDP_DEFAULT_DESTINATION_ID = 1
|
|
WLED_INFO_TIMEOUT_S = 0.25
|
|
WLED_INFO_SOURCE = "WLED /json/info leds.fps (live only)"
|
|
DDP_UNCHANGED_HOST_KEEPALIVE_S = 0.35
|
|
|
|
|
|
class DdpOutputBackend(OutputBackend):
|
|
backend_id = "ddp"
|
|
display_name = "DDP (WLED)"
|
|
supports_live_output = True
|
|
|
|
def __init__(self, port: int = DDP_DEFAULT_PORT, destination_id: int = DDP_DEFAULT_DESTINATION_ID) -> None:
|
|
self.port = int(port)
|
|
self.destination_id = int(destination_id)
|
|
self._socket: socket.socket | None = None
|
|
self._sequence = 0
|
|
self._last_payload_by_host: dict[str, bytes] = {}
|
|
self._last_payload_sent_at: dict[str, float] = {}
|
|
|
|
def start(self) -> None:
|
|
if self._socket is None:
|
|
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
def stop(self) -> None:
|
|
if self._socket is not None:
|
|
self._socket.close()
|
|
self._socket = None
|
|
self._last_payload_by_host.clear()
|
|
self._last_payload_sent_at.clear()
|
|
|
|
def controller_metrics(self, config: InfinityMirrorConfig) -> ControllerMetrics | None:
|
|
hosts = self._controller_hosts(config)
|
|
if not hosts:
|
|
return ControllerMetrics(source=WLED_INFO_SOURCE)
|
|
|
|
fps_values: list[float] = []
|
|
sampled_devices = 0
|
|
live_devices = 0
|
|
for host in hosts:
|
|
info = self._fetch_wled_info(host)
|
|
if not isinstance(info, dict):
|
|
continue
|
|
sampled_devices += 1
|
|
live = bool(info.get("live"))
|
|
leds = info.get("leds")
|
|
fps = leds.get("fps") if isinstance(leds, dict) else None
|
|
if not live:
|
|
continue
|
|
live_devices += 1
|
|
if isinstance(fps, (int, float)):
|
|
fps_values.append(float(fps))
|
|
elif isinstance(fps, str):
|
|
try:
|
|
fps_values.append(float(fps))
|
|
except ValueError:
|
|
pass
|
|
|
|
average_fps = (sum(fps_values) / len(fps_values)) if fps_values else None
|
|
return ControllerMetrics(
|
|
fps=average_fps,
|
|
live_devices=live_devices,
|
|
sampled_devices=sampled_devices,
|
|
total_devices=len(hosts),
|
|
source=WLED_INFO_SOURCE,
|
|
)
|
|
|
|
def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult:
|
|
try:
|
|
self.start()
|
|
packets = self._build_packets(config, frame)
|
|
for host, sequence, offset, payload, last in packets:
|
|
packet = self._create_ddp_packet(sequence=sequence, offset=offset, payload=payload, last=last)
|
|
self._socket.sendto(packet, (host, self.port))
|
|
device_count = len({host for host, _, _, _, _ in packets})
|
|
return OutputResult(
|
|
ok=True,
|
|
message=f"Sent {len(packets)} DDP packet(s).",
|
|
packets_sent=len(packets),
|
|
device_count=device_count,
|
|
)
|
|
except OSError as exc:
|
|
return OutputResult(ok=False, message=f"DDP send failed: {exc}")
|
|
|
|
def _build_packets(
|
|
self,
|
|
config: InfinityMirrorConfig,
|
|
frame: PreviewFrame,
|
|
) -> list[tuple[str, int, int, bytes, bool]]:
|
|
packets: list[tuple[str, int, int, bytes, bool]] = []
|
|
sequence = self._next_sequence()
|
|
payloads = self._filter_redundant_payloads(self._build_device_payloads(config, frame), time.perf_counter())
|
|
|
|
for host, payload in payloads.items():
|
|
for offset in range(0, len(payload), DDP_MAX_DATA_LENGTH):
|
|
chunk = payload[offset : offset + DDP_MAX_DATA_LENGTH]
|
|
last = offset + DDP_MAX_DATA_LENGTH >= len(payload)
|
|
packets.append((host, sequence, offset, chunk, last))
|
|
return packets
|
|
|
|
def _build_device_payloads(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> dict[str, bytes]:
|
|
host_payloads: dict[str, bytearray] = {}
|
|
|
|
for tile in config.sorted_tiles():
|
|
tile_frame = frame.tiles.get(tile.tile_id)
|
|
if not tile.enabled or tile_frame is None:
|
|
continue
|
|
|
|
payload = self._tile_payload(tile, tile_frame)
|
|
if not payload:
|
|
continue
|
|
|
|
host = tile.controller_ip or "127.0.0.1"
|
|
host_payloads.setdefault(host, bytearray()).extend(payload)
|
|
|
|
return {host: bytes(payload) for host, payload in host_payloads.items()}
|
|
|
|
def _tile_payload(self, tile: TileConfig, tile_frame: TileFrame) -> bytes:
|
|
required_length = tile.led_total * 3
|
|
for segment in tile.segments:
|
|
segment_colors = tile_frame.led_pixels.get(segment.name, [])
|
|
segment_end = max(0, segment.start_channel - 1) + (len(segment_colors) * 3)
|
|
required_length = max(required_length, segment_end)
|
|
|
|
if required_length <= 0:
|
|
return b""
|
|
|
|
payload = bytearray(required_length)
|
|
for segment in sorted(tile.segments, key=lambda item: (item.start_channel, item.name)):
|
|
segment_colors = tile_frame.led_pixels.get(segment.name, [])
|
|
base_index = max(0, segment.start_channel - 1)
|
|
for led_index, color in enumerate(segment_colors):
|
|
channel_index = base_index + led_index * 3
|
|
if channel_index + 2 >= len(payload):
|
|
break
|
|
red, green, blue = color.to_8bit_tuple()
|
|
payload[channel_index] = red
|
|
payload[channel_index + 1] = green
|
|
payload[channel_index + 2] = blue
|
|
|
|
return bytes(payload)
|
|
|
|
def _create_ddp_packet(self, sequence: int, offset: int, payload: bytes, last: bool) -> bytes:
|
|
header = struct.pack(
|
|
"!BBBBLH",
|
|
DDP_VERSION_1 | (DDP_PUSH_FLAG if last else 0),
|
|
sequence,
|
|
DDP_RGB888,
|
|
self.destination_id,
|
|
offset,
|
|
len(payload),
|
|
)
|
|
return header + payload
|
|
|
|
def _next_sequence(self) -> int:
|
|
self._sequence = (self._sequence % 15) + 1
|
|
return self._sequence
|
|
|
|
def _controller_hosts(self, config: InfinityMirrorConfig) -> list[str]:
|
|
return sorted({tile.controller_ip.strip() for tile in config.sorted_tiles() if tile.enabled and tile.controller_ip.strip()})
|
|
|
|
def _fetch_wled_info(self, host: str) -> dict[str, object] | None:
|
|
result = fetch_wled_info(host, timeout_s=WLED_INFO_TIMEOUT_S)
|
|
if result is None:
|
|
return None
|
|
payload, _endpoint = result
|
|
return payload
|
|
|
|
def _filter_redundant_payloads(self, host_payloads: dict[str, bytes], now: float) -> dict[str, bytes]:
|
|
filtered: dict[str, bytes] = {}
|
|
active_hosts = set(host_payloads)
|
|
|
|
for stale_host in set(self._last_payload_by_host) - active_hosts:
|
|
self._last_payload_by_host.pop(stale_host, None)
|
|
self._last_payload_sent_at.pop(stale_host, None)
|
|
|
|
for host, payload in host_payloads.items():
|
|
previous_payload = self._last_payload_by_host.get(host)
|
|
last_sent_at = self._last_payload_sent_at.get(host, 0.0)
|
|
unchanged = previous_payload == payload
|
|
keepalive_due = (now - last_sent_at) >= DDP_UNCHANGED_HOST_KEEPALIVE_S
|
|
if unchanged and not keepalive_due:
|
|
continue
|
|
filtered[host] = payload
|
|
self._last_payload_by_host[host] = payload
|
|
self._last_payload_sent_at[host] = now
|
|
|
|
return filtered
|