Files
RFP_Infinity-Vis/app/config/xml_mapping.py

362 lines
15 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import ipaddress
import xml.etree.ElementTree as ET
from .models import (
CalibrationConfig,
CompositionInfo,
DefaultsConfig,
InfinityMirrorConfig,
LogicalDisplayConfig,
SegmentConfig,
SourceInfo,
TileConfig,
)
@dataclass
class ValidationResult:
errors: list[str]
@property
def is_valid(self) -> bool:
return not self.errors
class MappingValidationError(ValueError):
def __init__(self, errors: list[str]) -> None:
self.errors = errors
super().__init__("\n".join(errors))
def _get_required(node: ET.Element, key: str) -> str:
value = node.get(key)
if value is None:
raise MappingValidationError([f"Missing required attribute '{key}' on <{node.tag}>."])
return value
def _get_int(node: ET.Element, key: str, default: int | None = None) -> int:
raw = node.get(key)
if raw is None:
if default is None:
raise MappingValidationError([f"Missing required integer attribute '{key}' on <{node.tag}>."])
return default
try:
return int(raw)
except ValueError as exc:
raise MappingValidationError([f"Invalid integer for '{key}' on <{node.tag}>: {raw!r}"]) from exc
def _get_float(node: ET.Element, key: str, default: float | None = None) -> float:
raw = node.get(key)
if raw is None:
if default is None:
raise MappingValidationError([f"Missing required float attribute '{key}' on <{node.tag}>."])
return default
try:
return float(raw)
except ValueError as exc:
raise MappingValidationError([f"Invalid float for '{key}' on <{node.tag}>: {raw!r}"]) from exc
def _get_bool(node: ET.Element, key: str, default: bool = False) -> bool:
raw = node.get(key)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def load_config(path: str | Path, validate: bool = True) -> InfinityMirrorConfig:
xml_text = Path(path).read_text(encoding="utf-8")
config = load_config_from_string(xml_text, validate=validate)
config.file_path = Path(path)
return config
def load_config_from_string(xml_text: str, validate: bool = True) -> InfinityMirrorConfig:
try:
root = ET.fromstring(xml_text)
except ET.ParseError as exc:
raise MappingValidationError([f"XML parse error: {exc}"]) from exc
if root.tag != "InfinityMirrorConfig":
raise MappingValidationError([f"Unexpected root element: <{root.tag}>"])
source_node = root.find("Source")
composition_node = source_node.find("OriginalComposition") if source_node is not None else None
source = SourceInfo(
original_export=source_node.findtext("OriginalExport", default="") if source_node is not None else "",
derived_from=source_node.findtext("DerivedFrom", default="") if source_node is not None else "",
composition=CompositionInfo(
width=int(composition_node.get("width", "1200")) if composition_node is not None else 1200,
height=int(composition_node.get("height", "600")) if composition_node is not None else 600,
),
)
logical_node = root.find("LogicalDisplay")
logical_display = LogicalDisplayConfig(
rows=_get_int(logical_node, "rows", 3) if logical_node is not None else 3,
cols=_get_int(logical_node, "cols", 6) if logical_node is not None else 6,
preview_width=_get_int(logical_node, "previewWidth", 1200) if logical_node is not None else 1200,
preview_height=_get_int(logical_node, "previewHeight", 600) if logical_node is not None else 600,
tile_width=_get_int(logical_node, "tileWidth", 200) if logical_node is not None else 200,
tile_height=_get_int(logical_node, "tileHeight", 200) if logical_node is not None else 200,
)
defaults_node = root.find("Defaults")
defaults = DefaultsConfig(
protocol=defaults_node.findtext("protocol", default="artnet") if defaults_node is not None else "artnet",
subnet=int(defaults_node.findtext("subnet", default="0")) if defaults_node is not None else 0,
color_format=defaults_node.findtext("colorFormat", default="rgb") if defaults_node is not None else "rgb",
tile_behavior=defaults_node.findtext("tileBehavior", default="solid_color_per_tile")
if defaults_node is not None
else "solid_color_per_tile",
global_gamma=float(defaults_node.findtext("globalGamma", default="2.2")) if defaults_node is not None else 2.2,
)
tiles: list[TileConfig] = []
tiles_node = root.find("Tiles")
if tiles_node is None:
raise MappingValidationError(["Missing <Tiles> element."])
for tile_node in tiles_node.findall("Tile"):
calibration_node = tile_node.find("Calibration")
calibration = CalibrationConfig(
brightness=_get_float(calibration_node, "brightness", 1.0) if calibration_node is not None else 1.0,
red_gain=_get_float(calibration_node, "redGain", 1.0) if calibration_node is not None else 1.0,
green_gain=_get_float(calibration_node, "greenGain", 1.0) if calibration_node is not None else 1.0,
blue_gain=_get_float(calibration_node, "blueGain", 1.0) if calibration_node is not None else 1.0,
)
segments: list[SegmentConfig] = []
segments_node = tile_node.find("Segments")
for segment_node in segments_node.findall("Segment") if segments_node is not None else []:
segments.append(
SegmentConfig(
name=segment_node.get("name", ""),
side=segment_node.get("side", ""),
start_channel=_get_int(segment_node, "startChannel", 1),
led_count=_get_int(segment_node, "ledCount", 0),
orientation_rad=_get_float(segment_node, "orientationRad", 0.0),
x0=_get_float(segment_node, "x0", 0.0),
y0=_get_float(segment_node, "y0", 0.0),
x1=_get_float(segment_node, "x1", 0.0),
y1=_get_float(segment_node, "y1", 0.0),
reverse=_get_bool(segment_node, "reverse", False),
)
)
segments.sort(key=lambda segment: (segment.start_channel, segment.name))
tiles.append(
TileConfig(
tile_id=_get_required(tile_node, "id"),
row=_get_int(tile_node, "row"),
col=_get_int(tile_node, "col"),
screen_name=tile_node.get("screenName", ""),
controller_ip=tile_node.get("ip", ""),
controller_host=tile_node.get("controllerHost", ""),
controller_name=tile_node.get("controllerName", ""),
controller_mac=tile_node.get("controllerMac", ""),
universe=_get_int(tile_node, "universe", 0),
subnet=_get_int(tile_node, "subnet", defaults.subnet),
led_total=_get_int(tile_node, "ledTotal", 0),
x0=_get_float(tile_node, "x0", 0.0),
y0=_get_float(tile_node, "y0", 0.0),
x1=_get_float(tile_node, "x1", 0.0),
y1=_get_float(tile_node, "y1", 0.0),
enabled=_get_bool(tile_node, "enabled", True),
calibration=calibration,
segments=segments,
)
)
config = InfinityMirrorConfig(
name=root.get("name", "Infinity Mirror"),
version=root.get("version", "1.0"),
source=source,
logical_display=logical_display,
defaults=defaults,
tiles=tiles,
)
if validate:
result = validate_config(config)
if not result.is_valid:
raise MappingValidationError(result.errors)
return config
def validate_config(config: InfinityMirrorConfig) -> ValidationResult:
errors: list[str] = []
rows = config.logical_display.rows
cols = config.logical_display.cols
if rows <= 0 or cols <= 0:
errors.append("Logical display must have positive rows and columns.")
seen_ids: set[str] = set()
seen_positions: set[tuple[int, int]] = set()
for tile in config.tiles:
if not tile.tile_id:
errors.append("Tile id cannot be empty.")
elif tile.tile_id in seen_ids:
errors.append(f"Duplicate tile id: {tile.tile_id}")
seen_ids.add(tile.tile_id)
if not (1 <= tile.row <= rows):
errors.append(f"{tile.tile_id}: row {tile.row} is outside 1..{rows}.")
if not (1 <= tile.col <= cols):
errors.append(f"{tile.tile_id}: col {tile.col} is outside 1..{cols}.")
position = (tile.row, tile.col)
if position in seen_positions:
errors.append(f"Duplicate tile position row={tile.row}, col={tile.col}.")
seen_positions.add(position)
if tile.controller_ip:
try:
ipaddress.ip_address(tile.controller_ip)
except ValueError:
errors.append(f"{tile.tile_id}: invalid IP address {tile.controller_ip!r}.")
if tile.universe < 0:
errors.append(f"{tile.tile_id}: universe must be >= 0.")
if not (0 <= tile.subnet <= 15):
errors.append(f"{tile.tile_id}: subnet must be between 0 and 15.")
if tile.led_total < 0:
errors.append(f"{tile.tile_id}: led count must be >= 0.")
if tile.brightness_factor < 0:
errors.append(f"{tile.tile_id}: brightness factor must be >= 0.")
segment_led_total = 0
for segment in tile.segments:
if not segment.name:
errors.append(f"{tile.tile_id}: segment name cannot be empty.")
if segment.led_count <= 0:
errors.append(f"{tile.tile_id}/{segment.name}: led count must be > 0.")
if segment.start_channel <= 0:
errors.append(f"{tile.tile_id}/{segment.name}: start channel must be > 0.")
segment_led_total += segment.led_count
if tile.segments and segment_led_total != tile.led_total:
errors.append(
f"{tile.tile_id}: ledTotal={tile.led_total} does not match segment sum {segment_led_total}."
)
expected_tiles = rows * cols
if len(config.tiles) != expected_tiles:
errors.append(f"Expected {expected_tiles} tiles for a {rows}x{cols} display, found {len(config.tiles)}.")
return ValidationResult(errors)
def config_to_xml_string(config: InfinityMirrorConfig) -> str:
root = ET.Element("InfinityMirrorConfig", {"name": config.name, "version": config.version})
source_node = ET.SubElement(root, "Source")
ET.SubElement(source_node, "OriginalExport").text = config.source.original_export
ET.SubElement(source_node, "DerivedFrom").text = config.source.derived_from
ET.SubElement(
source_node,
"OriginalComposition",
{
"width": str(config.source.composition.width),
"height": str(config.source.composition.height),
},
)
ET.SubElement(
root,
"LogicalDisplay",
{
"rows": str(config.logical_display.rows),
"cols": str(config.logical_display.cols),
"previewWidth": str(config.logical_display.preview_width),
"previewHeight": str(config.logical_display.preview_height),
"tileWidth": str(config.logical_display.tile_width),
"tileHeight": str(config.logical_display.tile_height),
},
)
defaults_node = ET.SubElement(root, "Defaults")
ET.SubElement(defaults_node, "protocol").text = config.defaults.protocol
ET.SubElement(defaults_node, "subnet").text = str(config.defaults.subnet)
ET.SubElement(defaults_node, "colorFormat").text = config.defaults.color_format
ET.SubElement(defaults_node, "tileBehavior").text = config.defaults.tile_behavior
ET.SubElement(defaults_node, "globalGamma").text = str(config.defaults.global_gamma)
tiles_node = ET.SubElement(root, "Tiles")
for tile in config.sorted_tiles():
tile_attributes = {
"id": tile.tile_id,
"row": str(tile.row),
"col": str(tile.col),
"screenName": tile.screen_name,
"ip": tile.controller_ip,
"universe": str(tile.universe),
"subnet": str(tile.subnet),
"ledTotal": str(tile.led_total),
"x0": _format_float(tile.x0),
"y0": _format_float(tile.y0),
"x1": _format_float(tile.x1),
"y1": _format_float(tile.y1),
"enabled": "true" if tile.enabled else "false",
}
if tile.controller_host:
tile_attributes["controllerHost"] = tile.controller_host
if tile.controller_name:
tile_attributes["controllerName"] = tile.controller_name
if tile.controller_mac:
tile_attributes["controllerMac"] = tile.controller_mac
tile_node = ET.SubElement(tiles_node, "Tile", tile_attributes)
ET.SubElement(
tile_node,
"Calibration",
{
"brightness": _format_float(tile.calibration.brightness),
"redGain": _format_float(tile.calibration.red_gain),
"greenGain": _format_float(tile.calibration.green_gain),
"blueGain": _format_float(tile.calibration.blue_gain),
},
)
segments_node = ET.SubElement(tile_node, "Segments")
for segment in sorted(tile.segments, key=lambda item: (item.start_channel, item.name)):
ET.SubElement(
segments_node,
"Segment",
{
"name": segment.name,
"side": segment.side,
"startChannel": str(segment.start_channel),
"ledCount": str(segment.led_count),
"orientationRad": _format_float(segment.orientation_rad),
"x0": _format_float(segment.x0),
"y0": _format_float(segment.y0),
"x1": _format_float(segment.x1),
"y1": _format_float(segment.y1),
"reverse": "true" if segment.reverse else "false",
},
)
ET.indent(root, space=" ")
return ET.tostring(root, encoding="unicode", xml_declaration=True)
def save_config(config: InfinityMirrorConfig, path: str | Path) -> None:
result = validate_config(config)
if not result.is_valid:
raise MappingValidationError(result.errors)
Path(path).write_text(config_to_xml_string(config), encoding="utf-8")
config.file_path = Path(path)
def _format_float(value: float) -> str:
text = f"{value:.6f}".rstrip("0").rstrip(".")
return text if text else "0"