83 lines
3.4 KiB
Python
83 lines
3.4 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
import socket
|
|
import struct
|
|
|
|
from app.config.models import InfinityMirrorConfig
|
|
from app.core.types import PreviewFrame
|
|
|
|
from .base import OutputBackend, OutputResult
|
|
|
|
|
|
class ArtnetOutputBackend(OutputBackend):
|
|
backend_id = "artnet"
|
|
display_name = "Art-Net"
|
|
supports_live_output = True
|
|
|
|
def __init__(self) -> None:
|
|
self._socket: socket.socket | None = None
|
|
self._sequence = 0
|
|
|
|
def start(self) -> None:
|
|
if self._socket is None:
|
|
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
def stop(self) -> None:
|
|
if self._socket is not None:
|
|
self._socket.close()
|
|
self._socket = None
|
|
|
|
def send_frame(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> OutputResult:
|
|
try:
|
|
self.start()
|
|
packets = self._build_packets(config, frame)
|
|
for host, subnet, universe, payload in packets:
|
|
packet = self._create_artnet_packet(subnet=subnet, universe=universe, payload=payload)
|
|
self._socket.sendto(packet, (host, 6454))
|
|
device_count = len({host for host, _, _, _ in packets})
|
|
return OutputResult(
|
|
ok=True,
|
|
message=f"Sent {len(packets)} Art-Net packet(s).",
|
|
packets_sent=len(packets),
|
|
device_count=device_count,
|
|
)
|
|
except OSError as exc:
|
|
return OutputResult(ok=False, message=f"Art-Net send failed: {exc}")
|
|
|
|
def _build_packets(self, config: InfinityMirrorConfig, frame: PreviewFrame) -> list[tuple[str, int, int, bytes]]:
|
|
grouped: dict[tuple[str, int, int], bytearray] = defaultdict(lambda: bytearray(512))
|
|
|
|
for tile in config.sorted_tiles():
|
|
tile_frame = frame.tiles.get(tile.tile_id)
|
|
if not tile.enabled or tile_frame is None:
|
|
continue
|
|
host = tile.controller_ip or "127.0.0.1"
|
|
key = (host, tile.subnet, tile.universe)
|
|
universe = grouped[key]
|
|
for segment in tile.segments:
|
|
segment_colors = tile_frame.led_pixels.get(segment.name, [])
|
|
base_index = max(0, segment.start_channel - 1)
|
|
for led_index, color in enumerate(segment_colors):
|
|
channel_index = base_index + led_index * 3
|
|
if channel_index + 2 >= len(universe):
|
|
break
|
|
red, green, blue = color.to_8bit_tuple()
|
|
universe[channel_index] = red
|
|
universe[channel_index + 1] = green
|
|
universe[channel_index + 2] = blue
|
|
|
|
return [(host, subnet, universe, bytes(payload)) for (host, subnet, universe), payload in grouped.items()]
|
|
|
|
def _create_artnet_packet(self, subnet: int, universe: int, payload: bytes) -> bytes:
|
|
self._sequence = (self._sequence + 1) % 256
|
|
header = b"Art-Net\x00"
|
|
opcode = struct.pack("<H", 0x5000)
|
|
prot_ver = struct.pack(">H", 14)
|
|
sequence = struct.pack("B", self._sequence)
|
|
physical = struct.pack("B", 0)
|
|
port_address = ((subnet & 0x0F) << 4) | (universe & 0x0F)
|
|
subnet_universe = struct.pack("<H", port_address)
|
|
length = struct.pack(">H", len(payload))
|
|
return header + opcode + prot_ver + sequence + physical + subnet_universe + length + payload
|