Files
RFP_Infinity-Vis/Infinity_Vis_1/infinity_vis_1/mapping_xml.py

264 lines
9.8 KiB
Python

from __future__ import annotations
from pathlib import Path
import ipaddress
import xml.etree.ElementTree as ET
from .models import MappingSpec, SegmentSpec, TileSpec
class MappingError(ValueError):
pass
def _req(node: ET.Element, key: str) -> str:
value = node.get(key)
if value is None:
raise MappingError(f"Missing required attribute {key!r} on <{node.tag}>.")
return value
def _get_int(node: ET.Element | None, key: str, default: int = 0) -> int:
if node is None:
return default
raw = node.get(key)
if raw is None:
return default
try:
return int(raw)
except ValueError as exc:
raise MappingError(f"Invalid integer {raw!r} for {key!r} on <{node.tag}>.") from exc
def _get_float(node: ET.Element | None, key: str, default: float = 0.0) -> float:
if node is None:
return default
raw = node.get(key)
if raw is None:
return default
try:
return float(raw)
except ValueError as exc:
raise MappingError(f"Invalid float {raw!r} for {key!r} on <{node.tag}>.") from exc
def _get_bool(node: ET.Element | None, key: str, default: bool = False) -> bool:
if node is None:
return default
raw = node.get(key)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def load_mapping(path: str | Path) -> MappingSpec:
xml_text = Path(path).read_text(encoding="utf-8")
mapping = load_mapping_from_string(xml_text)
mapping.file_path = Path(path)
return mapping
def load_mapping_from_string(xml_text: str) -> MappingSpec:
try:
root = ET.fromstring(xml_text)
except ET.ParseError as exc:
raise MappingError(f"XML parse error: {exc}") from exc
if root.tag != "InfinityMirrorConfig":
raise MappingError(f"Unexpected root element <{root.tag}>.")
logical = root.find("LogicalDisplay")
defaults = root.find("Defaults")
tiles_node = root.find("Tiles")
if tiles_node is None:
raise MappingError("Missing <Tiles> node.")
tiles: list[TileSpec] = []
for tile_node in tiles_node.findall("Tile"):
segments_node = tile_node.find("Segments")
segments: list[SegmentSpec] = []
for segment_node in segments_node.findall("Segment") if segments_node is not None else []:
segments.append(
SegmentSpec(
name=segment_node.get("name", ""),
side=segment_node.get("side", ""),
start_channel=_get_int(segment_node, "startChannel", 1),
led_count=_get_int(segment_node, "ledCount", 0),
reverse=_get_bool(segment_node, "reverse", False),
orientation_rad=_get_float(segment_node, "orientationRad", 0.0),
x0=_get_float(segment_node, "x0", 0.0),
y0=_get_float(segment_node, "y0", 0.0),
x1=_get_float(segment_node, "x1", 0.0),
y1=_get_float(segment_node, "y1", 0.0),
)
)
calibration_node = tile_node.find("Calibration")
tiles.append(
TileSpec(
tile_id=_req(tile_node, "id"),
row=_get_int(tile_node, "row", 0),
col=_get_int(tile_node, "col", 0),
led_total=_get_int(tile_node, "ledTotal", 0),
controller_ip=tile_node.get("ip", "").strip(),
screen_name=tile_node.get("screenName", ""),
controller_host=tile_node.get("controllerHost", ""),
controller_name=tile_node.get("controllerName", ""),
controller_mac=tile_node.get("controllerMac", ""),
universe=_get_int(tile_node, "universe", 0),
subnet=_get_int(tile_node, "subnet", 0),
enabled=_get_bool(tile_node, "enabled", True),
brightness=_get_float(calibration_node, "brightness", 1.0),
x0=_get_float(tile_node, "x0", 0.0),
y0=_get_float(tile_node, "y0", 0.0),
x1=_get_float(tile_node, "x1", 0.0),
y1=_get_float(tile_node, "y1", 0.0),
segments=sorted(segments, key=lambda item: (item.start_channel, item.name)),
)
)
mapping = MappingSpec(
name=root.get("name", "Infinity Vis 1"),
rows=_get_int(logical, "rows", 3),
cols=_get_int(logical, "cols", 6),
tiles=tiles,
protocol=defaults.findtext("protocol", default="ddp") if defaults is not None else "ddp",
subnet=int(defaults.findtext("subnet", default="0")) if defaults is not None else 0,
color_format=defaults.findtext("colorFormat", default="rgb") if defaults is not None else "rgb",
tile_behavior=defaults.findtext("tileBehavior", default="solid_color_per_tile")
if defaults is not None
else "solid_color_per_tile",
global_gamma=float(defaults.findtext("globalGamma", default="2.2")) if defaults is not None else 2.2,
)
validate_mapping(mapping)
return mapping
def validate_mapping(mapping: MappingSpec) -> None:
if mapping.rows <= 0 or mapping.cols <= 0:
raise MappingError("Rows and columns must be positive.")
if len(mapping.tiles) != mapping.rows * mapping.cols:
raise MappingError(
f"Expected {mapping.rows * mapping.cols} tiles for a {mapping.rows}x{mapping.cols} grid, found {len(mapping.tiles)}."
)
seen_ids: set[str] = set()
seen_positions: set[tuple[int, int]] = set()
for tile in mapping.tiles:
if tile.tile_id in seen_ids:
raise MappingError(f"Duplicate tile id {tile.tile_id!r}.")
seen_ids.add(tile.tile_id)
pos = (tile.row, tile.col)
if pos in seen_positions:
raise MappingError(f"Duplicate tile position row={tile.row}, col={tile.col}.")
seen_positions.add(pos)
if tile.controller_ip:
try:
ipaddress.ip_address(tile.controller_ip)
except ValueError as exc:
raise MappingError(f"Invalid controller IP {tile.controller_ip!r} on {tile.tile_id}.") from exc
segment_leds = sum(max(0, segment.led_count) for segment in tile.segments)
if tile.segments and segment_leds != tile.led_total:
raise MappingError(
f"{tile.tile_id}: ledTotal={tile.led_total} does not match segment sum {segment_leds}."
)
def save_mapping(mapping: MappingSpec, path: str | Path) -> None:
validate_mapping(mapping)
Path(path).write_text(mapping_to_xml_string(mapping), encoding="utf-8")
def mapping_to_xml_string(mapping: MappingSpec) -> str:
root = ET.Element("InfinityMirrorConfig", {"name": mapping.name, "version": "1.0"})
source = ET.SubElement(root, "Source")
ET.SubElement(source, "OriginalExport").text = "Infinity_Vis_1"
ET.SubElement(source, "DerivedFrom").text = "Infinity_Vis_1"
ET.SubElement(source, "OriginalComposition", {"width": "1200", "height": "600"})
ET.SubElement(
root,
"LogicalDisplay",
{
"rows": str(mapping.rows),
"cols": str(mapping.cols),
"previewWidth": "1200",
"previewHeight": "600",
"tileWidth": "200",
"tileHeight": "200",
},
)
defaults = ET.SubElement(root, "Defaults")
ET.SubElement(defaults, "protocol").text = mapping.protocol
ET.SubElement(defaults, "subnet").text = str(mapping.subnet)
ET.SubElement(defaults, "colorFormat").text = mapping.color_format
ET.SubElement(defaults, "tileBehavior").text = mapping.tile_behavior
ET.SubElement(defaults, "globalGamma").text = _fmt(mapping.global_gamma)
tiles_node = ET.SubElement(root, "Tiles")
for tile in mapping.ordered_tiles():
attrs = {
"id": tile.tile_id,
"row": str(tile.row),
"col": str(tile.col),
"screenName": tile.screen_name,
"ip": tile.controller_ip,
"universe": str(tile.universe),
"subnet": str(tile.subnet),
"ledTotal": str(tile.led_total),
"x0": _fmt(tile.x0),
"y0": _fmt(tile.y0),
"x1": _fmt(tile.x1),
"y1": _fmt(tile.y1),
"enabled": "true" if tile.enabled else "false",
}
if tile.controller_host:
attrs["controllerHost"] = tile.controller_host
if tile.controller_name:
attrs["controllerName"] = tile.controller_name
if tile.controller_mac:
attrs["controllerMac"] = tile.controller_mac
tile_node = ET.SubElement(tiles_node, "Tile", attrs)
ET.SubElement(
tile_node,
"Calibration",
{
"brightness": _fmt(tile.brightness),
"redGain": "1",
"greenGain": "1",
"blueGain": "1",
},
)
segments_node = ET.SubElement(tile_node, "Segments")
for segment in tile.segments:
ET.SubElement(
segments_node,
"Segment",
{
"name": segment.name,
"side": segment.side,
"startChannel": str(segment.start_channel),
"ledCount": str(segment.led_count),
"orientationRad": _fmt(segment.orientation_rad),
"x0": _fmt(segment.x0),
"y0": _fmt(segment.y0),
"x1": _fmt(segment.x1),
"y1": _fmt(segment.y1),
"reverse": "true" if segment.reverse else "false",
},
)
ET.indent(root, space=" ")
return ET.tostring(root, encoding="unicode", xml_declaration=True)
def _fmt(value: float) -> str:
text = f"{float(value):.6f}".rstrip("0").rstrip(".")
return text if text else "0"