Files
RFP_Infinity-Vis/app/output/artnet.py

83 lines
3.4 KiB
Python

from __future__ import annotations
from collections import defaultdict
import socket
import struct
from app.config.models import InfinityMirrorConfig
from app.core.types import PreviewFrame
from .base import OutputBackend, OutputResult
class ArtnetOutputBackend(OutputBackend):
backend_id = "artnet"
display_name = "Art-Net"
supports_live_output = True
def __init__(self) -> None:
self._socket: socket.socket | None = None
self._sequence = 0
def start(self) -> None:
if self._socket is None:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def stop(self) -> None:
if self._socket is not None:
self._socket.close()
self._socket = None
def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult:
try:
self.start()
packets = self._build_packets(config, frame)
for host, subnet, universe, payload in packets:
packet = self._create_artnet_packet(subnet=subnet, universe=universe, payload=payload)
self._socket.sendto(packet, (host, 6454))
device_count = len({host for host, _, _, _ in packets})
return OutputResult(
ok=True,
message=f"Sent {len(packets)} Art-Net packet(s).",
packets_sent=len(packets),
device_count=device_count,
)
except OSError as exc:
return OutputResult(ok=False, message=f"Art-Net send failed: {exc}")
def _build_packets(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> list[tuple[str, int, int, bytes]]:
grouped: dict[tuple[str, int, int], bytearray] = defaultdict(lambda: bytearray(512))
for tile in config.sorted_tiles():
tile_frame = frame.tiles.get(tile.tile_id)
if not tile.enabled or tile_frame is None:
continue
host = tile.controller_ip or "127.0.0.1"
key = (host, tile.subnet, tile.universe)
universe = grouped[key]
for segment in tile.segments:
segment_colors = tile_frame.led_pixels.get(segment.name, [])
base_index = max(0, segment.start_channel - 1)
for led_index, color in enumerate(segment_colors):
channel_index = base_index + led_index * 3
if channel_index + 2 >= len(universe):
break
red, green, blue = color.to_8bit_tuple()
universe[channel_index] = red
universe[channel_index + 1] = green
universe[channel_index + 2] = blue
return [(host, subnet, universe, bytes(payload)) for (host, subnet, universe), payload in grouped.items()]
def _create_artnet_packet(self, subnet: int, universe: int, payload: bytes) -> bytes:
self._sequence = (self._sequence + 1) % 256
header = b"Art-Net\x00"
opcode = struct.pack("<H", 0x5000)
prot_ver = struct.pack(">H", 14)
sequence = struct.pack("B", self._sequence)
physical = struct.pack("B", 0)
port_address = ((subnet & 0x0F) << 4) | (universe & 0x0F)
subnet_universe = struct.pack("<H", port_address)
length = struct.pack(">H", len(payload))
return header + opcode + prot_ver + sequence + physical + subnet_universe + length + payload