from __future__ import annotations from collections import defaultdict import socket import struct from app.config.models import InfinityMirrorConfig from app.core.types import PreviewFrame from .base import OutputBackend, OutputResult class ArtnetOutputBackend(OutputBackend): backend_id = "artnet" display_name = "Art-Net" supports_live_output = True def __init__(self) -> None: self._socket: socket.socket | None = None self._sequence = 0 def start(self) -> None: if self._socket is None: self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) def stop(self) -> None: if self._socket is not None: self._socket.close() self._socket = None def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult: try: self.start() packets = self._build_packets(config, frame) for host, subnet, universe, payload in packets: packet = self._create_artnet_packet(subnet=subnet, universe=universe, payload=payload) self._socket.sendto(packet, (host, 6454)) device_count = len({host for host, _, _, _ in packets}) return OutputResult( ok=True, message=f"Sent {len(packets)} Art-Net packet(s).", packets_sent=len(packets), device_count=device_count, ) except OSError as exc: return OutputResult(ok=False, message=f"Art-Net send failed: {exc}") def _build_packets(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> list[tuple[str, int, int, bytes]]: grouped: dict[tuple[str, int, int], bytearray] = defaultdict(lambda: bytearray(512)) for tile in config.sorted_tiles(): tile_frame = frame.tiles.get(tile.tile_id) if not tile.enabled or tile_frame is None: continue host = tile.controller_ip or "127.0.0.1" key = (host, tile.subnet, tile.universe) universe = grouped[key] for segment in tile.segments: segment_colors = tile_frame.led_pixels.get(segment.name, []) base_index = max(0, segment.start_channel - 1) for led_index, color in enumerate(segment_colors): channel_index = base_index + led_index * 3 if channel_index + 2 >= len(universe): break red, green, blue = color.to_8bit_tuple() universe[channel_index] = red universe[channel_index + 1] = green universe[channel_index + 2] = blue return [(host, subnet, universe, bytes(payload)) for (host, subnet, universe), payload in grouped.items()] def _create_artnet_packet(self, subnet: int, universe: int, payload: bytes) -> bytes: self._sequence = (self._sequence + 1) % 256 header = b"Art-Net\x00" opcode = struct.pack("H", 14) sequence = struct.pack("B", self._sequence) physical = struct.pack("B", 0) port_address = ((subnet & 0x0F) << 4) | (universe & 0x0F) subnet_universe = struct.pack("H", len(payload)) return header + opcode + prot_ver + sequence + physical + subnet_universe + length + payload