from __future__ import annotations from dataclasses import dataclass from pathlib import Path import ipaddress import xml.etree.ElementTree as ET from .models import ( CalibrationConfig, CompositionInfo, DefaultsConfig, InfinityMirrorConfig, LogicalDisplayConfig, SegmentConfig, SourceInfo, TileConfig, ) @dataclass class ValidationResult: errors: list[str] @property def is_valid(self) -> bool: return not self.errors class MappingValidationError(ValueError): def __init__(self, errors: list[str]) -> None: self.errors = errors super().__init__("\n".join(errors)) def _get_required(node: ET.Element, key: str) -> str: value = node.get(key) if value is None: raise MappingValidationError([f"Missing required attribute '{key}' on <{node.tag}>."]) return value def _get_int(node: ET.Element, key: str, default: int | None = None) -> int: raw = node.get(key) if raw is None: if default is None: raise MappingValidationError([f"Missing required integer attribute '{key}' on <{node.tag}>."]) return default try: return int(raw) except ValueError as exc: raise MappingValidationError([f"Invalid integer for '{key}' on <{node.tag}>: {raw!r}"]) from exc def _get_float(node: ET.Element, key: str, default: float | None = None) -> float: raw = node.get(key) if raw is None: if default is None: raise MappingValidationError([f"Missing required float attribute '{key}' on <{node.tag}>."]) return default try: return float(raw) except ValueError as exc: raise MappingValidationError([f"Invalid float for '{key}' on <{node.tag}>: {raw!r}"]) from exc def _get_bool(node: ET.Element, key: str, default: bool = False) -> bool: raw = node.get(key) if raw is None: return default return raw.strip().lower() in {"1", "true", "yes", "on"} def load_config(path: str | Path, validate: bool = True) -> InfinityMirrorConfig: xml_text = Path(path).read_text(encoding="utf-8") config = load_config_from_string(xml_text, validate=validate) config.file_path = Path(path) return config def load_config_from_string(xml_text: str, validate: bool = True) -> InfinityMirrorConfig: try: root = ET.fromstring(xml_text) except ET.ParseError as exc: raise MappingValidationError([f"XML parse error: {exc}"]) from exc if root.tag != "InfinityMirrorConfig": raise MappingValidationError([f"Unexpected root element: <{root.tag}>"]) source_node = root.find("Source") composition_node = source_node.find("OriginalComposition") if source_node is not None else None source = SourceInfo( original_export=source_node.findtext("OriginalExport", default="") if source_node is not None else "", derived_from=source_node.findtext("DerivedFrom", default="") if source_node is not None else "", composition=CompositionInfo( width=int(composition_node.get("width", "1200")) if composition_node is not None else 1200, height=int(composition_node.get("height", "600")) if composition_node is not None else 600, ), ) logical_node = root.find("LogicalDisplay") logical_display = LogicalDisplayConfig( rows=_get_int(logical_node, "rows", 3) if logical_node is not None else 3, cols=_get_int(logical_node, "cols", 6) if logical_node is not None else 6, preview_width=_get_int(logical_node, "previewWidth", 1200) if logical_node is not None else 1200, preview_height=_get_int(logical_node, "previewHeight", 600) if logical_node is not None else 600, tile_width=_get_int(logical_node, "tileWidth", 200) if logical_node is not None else 200, tile_height=_get_int(logical_node, "tileHeight", 200) if logical_node is not None else 200, ) defaults_node = root.find("Defaults") defaults = DefaultsConfig( protocol=defaults_node.findtext("protocol", default="artnet") if defaults_node is not None else "artnet", subnet=int(defaults_node.findtext("subnet", default="0")) if defaults_node is not None else 0, color_format=defaults_node.findtext("colorFormat", default="rgb") if defaults_node is not None else "rgb", tile_behavior=defaults_node.findtext("tileBehavior", default="solid_color_per_tile") if defaults_node is not None else "solid_color_per_tile", global_gamma=float(defaults_node.findtext("globalGamma", default="2.2")) if defaults_node is not None else 2.2, ) tiles: list[TileConfig] = [] tiles_node = root.find("Tiles") if tiles_node is None: raise MappingValidationError(["Missing element."]) for tile_node in tiles_node.findall("Tile"): calibration_node = tile_node.find("Calibration") calibration = CalibrationConfig( brightness=_get_float(calibration_node, "brightness", 1.0) if calibration_node is not None else 1.0, red_gain=_get_float(calibration_node, "redGain", 1.0) if calibration_node is not None else 1.0, green_gain=_get_float(calibration_node, "greenGain", 1.0) if calibration_node is not None else 1.0, blue_gain=_get_float(calibration_node, "blueGain", 1.0) if calibration_node is not None else 1.0, ) segments: list[SegmentConfig] = [] segments_node = tile_node.find("Segments") for segment_node in segments_node.findall("Segment") if segments_node is not None else []: segments.append( SegmentConfig( name=segment_node.get("name", ""), side=segment_node.get("side", ""), start_channel=_get_int(segment_node, "startChannel", 1), led_count=_get_int(segment_node, "ledCount", 0), orientation_rad=_get_float(segment_node, "orientationRad", 0.0), x0=_get_float(segment_node, "x0", 0.0), y0=_get_float(segment_node, "y0", 0.0), x1=_get_float(segment_node, "x1", 0.0), y1=_get_float(segment_node, "y1", 0.0), reverse=_get_bool(segment_node, "reverse", False), ) ) segments.sort(key=lambda segment: (segment.start_channel, segment.name)) tiles.append( TileConfig( tile_id=_get_required(tile_node, "id"), row=_get_int(tile_node, "row"), col=_get_int(tile_node, "col"), screen_name=tile_node.get("screenName", ""), controller_ip=tile_node.get("ip", ""), controller_host=tile_node.get("controllerHost", ""), controller_name=tile_node.get("controllerName", ""), controller_mac=tile_node.get("controllerMac", ""), universe=_get_int(tile_node, "universe", 0), subnet=_get_int(tile_node, "subnet", defaults.subnet), led_total=_get_int(tile_node, "ledTotal", 0), x0=_get_float(tile_node, "x0", 0.0), y0=_get_float(tile_node, "y0", 0.0), x1=_get_float(tile_node, "x1", 0.0), y1=_get_float(tile_node, "y1", 0.0), enabled=_get_bool(tile_node, "enabled", True), calibration=calibration, segments=segments, ) ) config = InfinityMirrorConfig( name=root.get("name", "Infinity Mirror"), version=root.get("version", "1.0"), source=source, logical_display=logical_display, defaults=defaults, tiles=tiles, ) if validate: result = validate_config(config) if not result.is_valid: raise MappingValidationError(result.errors) return config def validate_config(config: InfinityMirrorConfig) -> ValidationResult: errors: list[str] = [] rows = config.logical_display.rows cols = config.logical_display.cols if rows <= 0 or cols <= 0: errors.append("Logical display must have positive rows and columns.") seen_ids: set[str] = set() seen_positions: set[tuple[int, int]] = set() for tile in config.tiles: if not tile.tile_id: errors.append("Tile id cannot be empty.") elif tile.tile_id in seen_ids: errors.append(f"Duplicate tile id: {tile.tile_id}") seen_ids.add(tile.tile_id) if not (1 <= tile.row <= rows): errors.append(f"{tile.tile_id}: row {tile.row} is outside 1..{rows}.") if not (1 <= tile.col <= cols): errors.append(f"{tile.tile_id}: col {tile.col} is outside 1..{cols}.") position = (tile.row, tile.col) if position in seen_positions: errors.append(f"Duplicate tile position row={tile.row}, col={tile.col}.") seen_positions.add(position) if tile.controller_ip: try: ipaddress.ip_address(tile.controller_ip) except ValueError: errors.append(f"{tile.tile_id}: invalid IP address {tile.controller_ip!r}.") if tile.universe < 0: errors.append(f"{tile.tile_id}: universe must be >= 0.") if not (0 <= tile.subnet <= 15): errors.append(f"{tile.tile_id}: subnet must be between 0 and 15.") if tile.led_total < 0: errors.append(f"{tile.tile_id}: led count must be >= 0.") if tile.brightness_factor < 0: errors.append(f"{tile.tile_id}: brightness factor must be >= 0.") segment_led_total = 0 for segment in tile.segments: if not segment.name: errors.append(f"{tile.tile_id}: segment name cannot be empty.") if segment.led_count <= 0: errors.append(f"{tile.tile_id}/{segment.name}: led count must be > 0.") if segment.start_channel <= 0: errors.append(f"{tile.tile_id}/{segment.name}: start channel must be > 0.") segment_led_total += segment.led_count if tile.segments and segment_led_total != tile.led_total: errors.append( f"{tile.tile_id}: ledTotal={tile.led_total} does not match segment sum {segment_led_total}." ) expected_tiles = rows * cols if len(config.tiles) != expected_tiles: errors.append(f"Expected {expected_tiles} tiles for a {rows}x{cols} display, found {len(config.tiles)}.") return ValidationResult(errors) def config_to_xml_string(config: InfinityMirrorConfig) -> str: root = ET.Element("InfinityMirrorConfig", {"name": config.name, "version": config.version}) source_node = ET.SubElement(root, "Source") ET.SubElement(source_node, "OriginalExport").text = config.source.original_export ET.SubElement(source_node, "DerivedFrom").text = config.source.derived_from ET.SubElement( source_node, "OriginalComposition", { "width": str(config.source.composition.width), "height": str(config.source.composition.height), }, ) ET.SubElement( root, "LogicalDisplay", { "rows": str(config.logical_display.rows), "cols": str(config.logical_display.cols), "previewWidth": str(config.logical_display.preview_width), "previewHeight": str(config.logical_display.preview_height), "tileWidth": str(config.logical_display.tile_width), "tileHeight": str(config.logical_display.tile_height), }, ) defaults_node = ET.SubElement(root, "Defaults") ET.SubElement(defaults_node, "protocol").text = config.defaults.protocol ET.SubElement(defaults_node, "subnet").text = str(config.defaults.subnet) ET.SubElement(defaults_node, "colorFormat").text = config.defaults.color_format ET.SubElement(defaults_node, "tileBehavior").text = config.defaults.tile_behavior ET.SubElement(defaults_node, "globalGamma").text = str(config.defaults.global_gamma) tiles_node = ET.SubElement(root, "Tiles") for tile in config.sorted_tiles(): tile_attributes = { "id": tile.tile_id, "row": str(tile.row), "col": str(tile.col), "screenName": tile.screen_name, "ip": tile.controller_ip, "universe": str(tile.universe), "subnet": str(tile.subnet), "ledTotal": str(tile.led_total), "x0": _format_float(tile.x0), "y0": _format_float(tile.y0), "x1": _format_float(tile.x1), "y1": _format_float(tile.y1), "enabled": "true" if tile.enabled else "false", } if tile.controller_host: tile_attributes["controllerHost"] = tile.controller_host if tile.controller_name: tile_attributes["controllerName"] = tile.controller_name if tile.controller_mac: tile_attributes["controllerMac"] = tile.controller_mac tile_node = ET.SubElement(tiles_node, "Tile", tile_attributes) ET.SubElement( tile_node, "Calibration", { "brightness": _format_float(tile.calibration.brightness), "redGain": _format_float(tile.calibration.red_gain), "greenGain": _format_float(tile.calibration.green_gain), "blueGain": _format_float(tile.calibration.blue_gain), }, ) segments_node = ET.SubElement(tile_node, "Segments") for segment in sorted(tile.segments, key=lambda item: (item.start_channel, item.name)): ET.SubElement( segments_node, "Segment", { "name": segment.name, "side": segment.side, "startChannel": str(segment.start_channel), "ledCount": str(segment.led_count), "orientationRad": _format_float(segment.orientation_rad), "x0": _format_float(segment.x0), "y0": _format_float(segment.y0), "x1": _format_float(segment.x1), "y1": _format_float(segment.y1), "reverse": "true" if segment.reverse else "false", }, ) ET.indent(root, space=" ") return ET.tostring(root, encoding="unicode", xml_declaration=True) def save_config(config: InfinityMirrorConfig, path: str | Path) -> None: result = validate_config(config) if not result.is_valid: raise MappingValidationError(result.errors) Path(path).write_text(config_to_xml_string(config), encoding="utf-8") config.file_path = Path(path) def _format_float(value: float) -> str: text = f"{value:.6f}".rstrip("0").rstrip(".") return text if text else "0"