Files
RFP_Infinity-Vis/app/ui/settings_dialog.py

834 lines
38 KiB
Python

from __future__ import annotations
import threading
from app.qt_compat import QComboBox, QDialog, QDialogButtonBox, QFormLayout, QGroupBox, QHBoxLayout, QHeaderView, QLabel, QLineEdit, QListWidget, QMessageBox, QObject, QPushButton, QPlainTextEdit, QSpinBox, QTabWidget, QTableWidget, QTableWidgetItem, Qt, QVBoxLayout, QWidget, Signal
from app.config.device_assignment import assign_device_to_tile, find_tile_for_device, tile_matches_device
from app.config.models import InfinityMirrorConfig, SegmentConfig
from app.config.xml_mapping import MappingValidationError, config_to_xml_string, load_config_from_string, save_config, validate_config
from app.network.wled import DiscoveredWledDevice, build_scan_hosts, discover_wled_devices, identify_wled_device
from app.ui.mapping_assignment_preview import MappingAssignmentPreview
class NetworkScanWorker(QObject):
progress = Signal(int, int, object)
finished = Signal(object, str, int)
def __init__(self, config: InfinityMirrorConfig) -> None:
super().__init__()
self._config = config.clone()
def start(self) -> None:
thread = threading.Thread(target=self._run, name="InfinityMirrorNetworkScan", daemon=True)
thread.start()
def _run(self) -> None:
try:
hosts = build_scan_hosts(self._config)
devices = discover_wled_devices(hosts, progress_callback=self._emit_progress)
self.finished.emit(devices, "", len(hosts))
except Exception as exc:
self.finished.emit([], str(exc), 0)
def _emit_progress(self, completed: int, total: int, device: DiscoveredWledDevice | None) -> None:
self.progress.emit(completed, total, device)
class DeviceIdentifyWorker(QObject):
finished = Signal(object, str)
def __init__(self, device: DiscoveredWledDevice) -> None:
super().__init__()
self._device = device
def start(self) -> None:
thread = threading.Thread(target=self._run, name=f"WledIdentify-{self._device.ip_address}", daemon=True)
thread.start()
def _run(self) -> None:
try:
identify_wled_device(self._device.ip_address, led_count=self._device.led_count)
except Exception as exc:
self.finished.emit(self._device, str(exc))
return
self.finished.emit(self._device, "")
class SettingsDialog(QDialog):
TILE_COLUMNS = [
("tile_id", "Tile ID"),
("row", "Row"),
("col", "Col"),
("screen_name", "Screen Name"),
("controller_ip", "Controller IP"),
("controller_name", "Controller Name"),
("controller_host", "Controller Host"),
("controller_mac", "Controller MAC"),
("subnet", "Subnet"),
("universe", "Universe"),
("led_total", "LED Count"),
("brightness_factor", "Brightness"),
("enabled", "Enabled"),
]
SEGMENT_COLUMNS = [
("name", "Name"),
("side", "Side"),
("start_channel", "Start Ch"),
("led_count", "LED Count"),
("orientation_rad", "Orientation"),
("x0", "x0"),
("y0", "y0"),
("x1", "x1"),
("y1", "y1"),
("reverse", "Reverse"),
]
DEVICE_COLUMNS = [
"IP Address",
"Host / mDNS",
"WLED Name",
"MAC Address",
"Status",
"Map",
"Identify",
]
def __init__(self, config: InfinityMirrorConfig, controller=None, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setWindowTitle("Mapping Settings")
self.resize(1320, 900)
self.controller = controller
self.working_config = config.clone()
self.result_config: InfinityMirrorConfig | None = None
self._active_segment_tile_id: str | None = None
self._device_table_refreshing = False
self._active_device_key: str | None = None
self._selected_assignment_tile_id: str | None = None
self._scan_worker: NetworkScanWorker | None = None
self._identify_workers: set[DeviceIdentifyWorker] = set()
self.discovered_devices: list[DiscoveredWledDevice] = []
self._last_scan_host_count = 0
root_layout = QVBoxLayout(self)
header_group = QGroupBox("Mapping")
header_form = QFormLayout(header_group)
self.name_edit = QLineEdit(self.working_config.name)
self.rows_spin = QSpinBox()
self.rows_spin.setRange(1, 24)
self.rows_spin.setValue(self.working_config.logical_display.rows)
self.cols_spin = QSpinBox()
self.cols_spin.setRange(1, 24)
self.cols_spin.setValue(self.working_config.logical_display.cols)
header_form.addRow("Name", self.name_edit)
header_form.addRow("Rows", self.rows_spin)
header_form.addRow("Cols", self.cols_spin)
root_layout.addWidget(header_group)
self.tabs = QTabWidget()
self.tabs.currentChanged.connect(self._handle_tab_changed)
root_layout.addWidget(self.tabs, 1)
self.tiles_tab = QWidget()
self.network_tab = QWidget()
self.segments_tab = QWidget()
self.raw_tab = QWidget()
self.tabs.addTab(self.tiles_tab, "Tiles")
self.tabs.addTab(self.network_tab, "Network Mapping")
self.tabs.addTab(self.segments_tab, "Segments")
self.tabs.addTab(self.raw_tab, "Raw XML")
self._build_tiles_tab()
self._build_network_tab()
self._build_segments_tab()
self._build_raw_tab()
self._populate_tiles_table()
self._rebuild_segment_tile_combo()
self._refresh_raw_xml()
self._refresh_network_mapping_ui()
button_box = QDialogButtonBox(QDialogButtonBox.Save | QDialogButtonBox.Cancel)
button_box.accepted.connect(self._save_and_accept)
button_box.rejected.connect(self.reject)
root_layout.addWidget(button_box)
def _build_tiles_tab(self) -> None:
layout = QVBoxLayout(self.tiles_tab)
self.tiles_table = QTableWidget(0, len(self.TILE_COLUMNS))
self.tiles_table.setHorizontalHeaderLabels([label for _, label in self.TILE_COLUMNS])
self.tiles_table.verticalHeader().setVisible(False)
self.tiles_table.setAlternatingRowColors(True)
self.tiles_table.setSelectionBehavior(QTableWidget.SelectRows)
self.tiles_table.setSortingEnabled(False)
layout.addWidget(self.tiles_table)
def _build_network_tab(self) -> None:
layout = QVBoxLayout(self.network_tab)
layout.setSpacing(12)
intro = QLabel(
"Scan the local network for WLED controllers, flash one device at a time, "
"then click the matching physical tile to save the assignment."
)
intro.setWordWrap(True)
layout.addWidget(intro)
hint = QLabel(
"Assignments save immediately to the open mapping file when the current mapping validates. "
"If the mapping cannot be saved yet, the assignment stays in this dialog and you will see a warning."
)
hint.setWordWrap(True)
hint.setStyleSheet("color: #A8B3C7;")
layout.addWidget(hint)
content_row = QHBoxLayout()
content_row.setSpacing(12)
layout.addLayout(content_row, 1)
preview_group = QGroupBox("Mapping Preview")
preview_layout = QVBoxLayout(preview_group)
preview_layout.setSpacing(8)
self.assignment_workflow_label = QLabel("Scan the network to begin assisted mapping.")
self.assignment_workflow_label.setWordWrap(True)
preview_layout.addWidget(self.assignment_workflow_label)
self.mapping_preview = MappingAssignmentPreview()
self.mapping_preview.tileClicked.connect(self._handle_mapping_preview_click)
preview_layout.addWidget(self.mapping_preview, 1)
content_row.addWidget(preview_group, 1)
device_group = QGroupBox("Discovered WLED Devices")
device_layout = QVBoxLayout(device_group)
device_layout.setSpacing(8)
scan_row = QHBoxLayout()
self.scan_button = QPushButton("Scan Network")
self.scan_button.clicked.connect(self._scan_network)
scan_row.addWidget(self.scan_button)
self.scan_status_label = QLabel("Not scanned yet.")
self.scan_status_label.setWordWrap(True)
scan_row.addWidget(self.scan_status_label, 1)
device_layout.addLayout(scan_row)
self.mapping_summary_label = QLabel("No devices discovered.")
self.mapping_summary_label.setWordWrap(True)
self.mapping_summary_label.setStyleSheet("color: #C7D2E1;")
device_layout.addWidget(self.mapping_summary_label)
self.discovered_devices_table = QTableWidget(0, len(self.DEVICE_COLUMNS))
self.discovered_devices_table.setHorizontalHeaderLabels(self.DEVICE_COLUMNS)
self.discovered_devices_table.verticalHeader().setVisible(False)
self.discovered_devices_table.setAlternatingRowColors(True)
self.discovered_devices_table.setSelectionBehavior(QTableWidget.SelectRows)
self.discovered_devices_table.setSortingEnabled(False)
self.discovered_devices_table.itemSelectionChanged.connect(self._on_discovered_device_selection_changed)
header = self.discovered_devices_table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeToContents)
header.setSectionResizeMode(1, QHeaderView.ResizeToContents)
header.setSectionResizeMode(2, QHeaderView.ResizeToContents)
header.setSectionResizeMode(3, QHeaderView.ResizeToContents)
header.setSectionResizeMode(4, QHeaderView.Stretch)
header.setSectionResizeMode(5, QHeaderView.ResizeToContents)
header.setSectionResizeMode(6, QHeaderView.ResizeToContents)
device_layout.addWidget(self.discovered_devices_table, 1)
self.assignment_feedback_label = QLabel("Select a device or press Identify, then click the matching tile.")
self.assignment_feedback_label.setWordWrap(True)
self.assignment_feedback_label.setStyleSheet("color: #A8B3C7;")
device_layout.addWidget(self.assignment_feedback_label)
content_row.addWidget(device_group, 1)
def _build_segments_tab(self) -> None:
layout = QVBoxLayout(self.segments_tab)
top_row = QHBoxLayout()
top_row.addWidget(QLabel("Tile"))
self.segment_tile_combo = QComboBox()
self.segment_tile_combo.currentIndexChanged.connect(self._on_segment_tile_changed)
top_row.addWidget(self.segment_tile_combo, 1)
self.add_segment_button = QPushButton("Add Segment")
self.remove_segment_button = QPushButton("Remove Selected")
top_row.addWidget(self.add_segment_button)
top_row.addWidget(self.remove_segment_button)
layout.addLayout(top_row)
self.segments_table = QTableWidget(0, len(self.SEGMENT_COLUMNS))
self.segments_table.setHorizontalHeaderLabels([label for _, label in self.SEGMENT_COLUMNS])
self.segments_table.verticalHeader().setVisible(False)
self.segments_table.setAlternatingRowColors(True)
self.segments_table.setSelectionBehavior(QTableWidget.SelectRows)
layout.addWidget(self.segments_table, 1)
self.add_segment_button.clicked.connect(self._add_segment_row)
self.remove_segment_button.clicked.connect(self._remove_selected_segments)
def _build_raw_tab(self) -> None:
layout = QVBoxLayout(self.raw_tab)
button_row = QHBoxLayout()
self.refresh_raw_button = QPushButton("Refresh From Tables")
self.validate_raw_button = QPushButton("Validate XML")
self.apply_raw_button = QPushButton("Apply XML To Tables")
button_row.addWidget(self.refresh_raw_button)
button_row.addWidget(self.validate_raw_button)
button_row.addWidget(self.apply_raw_button)
button_row.addStretch(1)
layout.addLayout(button_row)
self.raw_editor = QPlainTextEdit()
self.raw_editor.setLineWrapMode(QPlainTextEdit.NoWrap)
self.raw_editor.document().setModified(False)
layout.addWidget(self.raw_editor, 1)
layout.addWidget(QLabel("Validation"))
self.error_list = QListWidget()
layout.addWidget(self.error_list)
self.refresh_raw_button.clicked.connect(self._refresh_raw_xml)
self.validate_raw_button.clicked.connect(self._validate_raw_xml)
self.apply_raw_button.clicked.connect(self._apply_raw_xml)
def _populate_tiles_table(self) -> None:
self.tiles_table.setRowCount(len(self.working_config.tiles))
for row, tile in enumerate(self.working_config.sorted_tiles()):
values = {
"tile_id": tile.tile_id,
"row": str(tile.row),
"col": str(tile.col),
"screen_name": tile.screen_name,
"controller_ip": tile.controller_ip,
"controller_name": tile.controller_name,
"controller_host": tile.controller_host,
"controller_mac": tile.controller_mac,
"subnet": str(tile.subnet),
"universe": str(tile.universe),
"led_total": str(tile.led_total),
"brightness_factor": f"{tile.brightness_factor:.3f}",
"enabled": tile.enabled,
}
for column, (key, _) in enumerate(self.TILE_COLUMNS):
if key == "enabled":
item = QTableWidgetItem()
item.setFlags(item.flags() | Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsSelectable)
item.setCheckState(Qt.Checked if tile.enabled else Qt.Unchecked)
else:
item = QTableWidgetItem(str(values[key]))
self.tiles_table.setItem(row, column, item)
self.tiles_table.resizeColumnsToContents()
def _rebuild_segment_tile_combo(self) -> None:
self.segment_tile_combo.blockSignals(True)
self.segment_tile_combo.clear()
for tile in self.working_config.sorted_tiles():
self.segment_tile_combo.addItem(tile.tile_id, tile.tile_id)
current_tile_id = self._active_segment_tile_id or (self.working_config.sorted_tiles()[0].tile_id if self.working_config.tiles else None)
if current_tile_id:
index = self.segment_tile_combo.findData(current_tile_id)
self.segment_tile_combo.setCurrentIndex(max(0, index))
self._active_segment_tile_id = self.segment_tile_combo.currentData()
self.segment_tile_combo.blockSignals(False)
self._populate_segments_table()
def _populate_segments_table(self) -> None:
tile = self._active_tile_for_segments()
self.segments_table.setRowCount(0)
if tile is None:
return
self.segments_table.setRowCount(len(tile.segments))
for row, segment in enumerate(tile.segments):
values = {
"name": segment.name,
"side": segment.side,
"start_channel": str(segment.start_channel),
"led_count": str(segment.led_count),
"orientation_rad": f"{segment.orientation_rad:.5f}",
"x0": f"{segment.x0:.3f}",
"y0": f"{segment.y0:.3f}",
"x1": f"{segment.x1:.3f}",
"y1": f"{segment.y1:.3f}",
"reverse": segment.reverse,
}
for column, (key, _) in enumerate(self.SEGMENT_COLUMNS):
if key == "reverse":
item = QTableWidgetItem()
item.setFlags(item.flags() | Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsSelectable)
item.setCheckState(Qt.Checked if segment.reverse else Qt.Unchecked)
else:
item = QTableWidgetItem(str(values[key]))
self.segments_table.setItem(row, column, item)
self.segments_table.resizeColumnsToContents()
def _active_tile_for_segments(self):
return self.working_config.tile_lookup().get(self.segment_tile_combo.currentData())
def _on_segment_tile_changed(self) -> None:
previous_tile_id = self._active_segment_tile_id
self._sync_segments_table(previous_tile_id)
self._active_segment_tile_id = self.segment_tile_combo.currentData()
self._populate_segments_table()
def _add_segment_row(self) -> None:
row = self.segments_table.rowCount()
self.segments_table.insertRow(row)
defaults = ["New Segment", "left", "1", "1", "0.0", "0.0", "0.0", "0.0", "0.0", False]
for column, (_, _) in enumerate(self.SEGMENT_COLUMNS):
value = defaults[column]
if column == len(self.SEGMENT_COLUMNS) - 1:
item = QTableWidgetItem()
item.setFlags(item.flags() | Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsSelectable)
item.setCheckState(Qt.Unchecked)
else:
item = QTableWidgetItem(str(value))
self.segments_table.setItem(row, column, item)
def _remove_selected_segments(self) -> None:
rows = sorted({item.row() for item in self.segments_table.selectedItems()}, reverse=True)
for row in rows:
self.segments_table.removeRow(row)
def _refresh_raw_xml(self) -> None:
self._sync_tables_to_config()
self._set_raw_xml_snapshot()
self.error_list.clear()
def _set_raw_xml_snapshot(self) -> None:
self.raw_editor.setPlainText(config_to_xml_string(self.working_config))
self.raw_editor.document().setModified(False)
def _validate_raw_xml(self) -> None:
self.error_list.clear()
try:
config = load_config_from_string(self.raw_editor.toPlainText(), validate=True)
self.error_list.addItem(f"XML valid. {len(config.tiles)} tiles loaded.")
except MappingValidationError as exc:
for error in exc.errors:
self.error_list.addItem(error)
def _apply_raw_xml(self) -> None:
self.error_list.clear()
try:
self.working_config = load_config_from_string(self.raw_editor.toPlainText(), validate=True)
self.name_edit.setText(self.working_config.name)
self.rows_spin.setValue(self.working_config.logical_display.rows)
self.cols_spin.setValue(self.working_config.logical_display.cols)
self._populate_tiles_table()
self._rebuild_segment_tile_combo()
self._refresh_network_mapping_ui()
self.raw_editor.document().setModified(False)
self.error_list.addItem("Applied XML to editable tables.")
except MappingValidationError as exc:
for error in exc.errors:
self.error_list.addItem(error)
def _handle_tab_changed(self, index: int) -> None:
try:
if self.tabs.widget(index) is self.network_tab:
self._sync_general_fields()
self._sync_tiles_table()
self._sync_segments_table()
self._refresh_network_mapping_ui()
elif self.tabs.widget(index) is self.segments_tab:
self._sync_tiles_table()
self._rebuild_segment_tile_combo()
elif self.tabs.widget(index) is self.raw_tab and not self.raw_editor.document().isModified():
self._refresh_raw_xml()
except (MappingValidationError, ValueError, KeyError, StopIteration) as exc:
self.tabs.blockSignals(True)
self.tabs.setCurrentWidget(self.tiles_tab)
self.tabs.blockSignals(False)
QMessageBox.warning(
self,
"Mapping Error",
f"Please resolve the current tile or segment values before switching tabs.\n\n{exc}",
)
def _sync_tables_to_config(self) -> None:
self._sync_general_fields()
self._sync_tiles_table()
self._sync_segments_table()
def _sync_general_fields(self) -> None:
self.working_config.name = self.name_edit.text().strip() or self.working_config.name
self.working_config.logical_display.rows = self.rows_spin.value()
self.working_config.logical_display.cols = self.cols_spin.value()
def _sync_tiles_table(self) -> None:
tiles = self.working_config.sorted_tiles()
for row, tile in enumerate(tiles):
tile.tile_id = self._text(row, "tile_id")
tile.row = int(self._text(row, "row"))
tile.col = int(self._text(row, "col"))
tile.screen_name = self._text(row, "screen_name")
tile.controller_ip = self._text(row, "controller_ip")
tile.controller_name = self._text(row, "controller_name")
tile.controller_host = self._text(row, "controller_host")
tile.controller_mac = self._text(row, "controller_mac")
tile.subnet = int(self._text(row, "subnet"))
tile.universe = int(self._text(row, "universe"))
tile.led_total = int(self._text(row, "led_total"))
tile.brightness_factor = float(self._text(row, "brightness_factor"))
tile.enabled = self._check(row, "enabled")
self.working_config.tiles = tiles
def _sync_segments_table(self, tile_id: str | None = None) -> None:
lookup_id = tile_id if tile_id is not None else self._active_segment_tile_id or self.segment_tile_combo.currentData()
tile = self.working_config.tile_lookup().get(lookup_id)
if tile is None:
return
segments: list[SegmentConfig] = []
for row in range(self.segments_table.rowCount()):
segments.append(
SegmentConfig(
name=self._segment_text(row, "name"),
side=self._segment_text(row, "side"),
start_channel=int(self._segment_text(row, "start_channel")),
led_count=int(self._segment_text(row, "led_count")),
orientation_rad=float(self._segment_text(row, "orientation_rad")),
x0=float(self._segment_text(row, "x0")),
y0=float(self._segment_text(row, "y0")),
x1=float(self._segment_text(row, "x1")),
y1=float(self._segment_text(row, "y1")),
reverse=self._segment_check(row, "reverse"),
)
)
tile.segments = segments
def _scan_network(self) -> None:
self._sync_general_fields()
self._sync_tiles_table()
self._sync_segments_table()
self.discovered_devices = []
self._active_device_key = None
self._selected_assignment_tile_id = None
self._scan_worker = NetworkScanWorker(self.working_config)
self._scan_worker.progress.connect(self._on_scan_progress)
self._scan_worker.finished.connect(self._on_scan_finished)
self.scan_button.setEnabled(False)
self.scan_status_label.setText("Scanning local subnets for WLED devices...")
self.assignment_feedback_label.setText("Scanning the network. Results will appear below as devices respond.")
self._refresh_network_mapping_ui()
self._scan_worker.start()
def _on_scan_progress(self, completed: int, total: int, device: DiscoveredWledDevice | None) -> None:
self.scan_status_label.setText(f"Scanning {completed}/{total} hosts...")
if device is None:
return
if any(self._device_key(existing) == self._device_key(device) for existing in self.discovered_devices):
return
self.discovered_devices.append(device)
self.discovered_devices.sort(key=lambda item: tuple(int(part) for part in item.ip_address.split(".")))
if self._active_device() is None:
self._active_device_key = self._device_key(self.discovered_devices[0])
self._refresh_network_mapping_ui()
def _on_scan_finished(self, devices: list[DiscoveredWledDevice], error_message: str, host_count: int) -> None:
self.scan_button.setEnabled(True)
self._scan_worker = None
self._last_scan_host_count = host_count
if error_message:
self.scan_status_label.setText("Network scan failed.")
self.assignment_feedback_label.setText(error_message)
QMessageBox.warning(self, "Network Scan Error", error_message)
return
self.discovered_devices = list(devices)
if self._active_device() is None and self.discovered_devices:
self._active_device_key = self._device_key(self.discovered_devices[0])
if not self.discovered_devices:
self.scan_status_label.setText(f"No WLED devices found after scanning {host_count} hosts.")
self.assignment_feedback_label.setText("No WLED devices were detected. Check the subnet, power, and network connectivity.")
else:
self.scan_status_label.setText(f"Found {len(self.discovered_devices)} WLED device(s) across {host_count} hosts.")
self._refresh_network_mapping_ui()
def _on_discovered_device_selection_changed(self) -> None:
if self._device_table_refreshing:
return
row = self.discovered_devices_table.currentRow()
if row < 0:
return
item = self.discovered_devices_table.item(row, 0)
if item is None:
return
device_key = item.data(Qt.UserRole)
if isinstance(device_key, str):
self._select_device_for_assignment(device_key, select_table=False)
def _select_device_for_assignment(self, device_key: str, *, select_table: bool = True) -> None:
self._active_device_key = device_key
active_device = self._active_device()
mapped_tile = find_tile_for_device(self.working_config, active_device) if active_device is not None else None
self._selected_assignment_tile_id = mapped_tile.tile_id if mapped_tile is not None else self._selected_assignment_tile_id
if select_table:
self._select_device_row(device_key)
self._refresh_network_mapping_ui()
def _select_device_row(self, device_key: str) -> None:
for row in range(self.discovered_devices_table.rowCount()):
item = self.discovered_devices_table.item(row, 0)
if item is None:
continue
if item.data(Qt.UserRole) == device_key:
self.discovered_devices_table.selectRow(row)
return
def _identify_device(self, device_key: str) -> None:
device = self._device_for_key(device_key)
if device is None:
return
self._select_device_for_assignment(device_key)
worker = DeviceIdentifyWorker(device)
self._identify_workers.add(worker)
worker.finished.connect(self._on_identify_finished)
self.assignment_feedback_label.setText(
f"Identifying {device.ip_address}. Watch for the red pulse, then click the matching tile."
)
worker.start()
def _on_identify_finished(self, device: DiscoveredWledDevice, error_message: str) -> None:
for worker in list(self._identify_workers):
if worker._device == device:
self._identify_workers.discard(worker)
break
if error_message:
self.assignment_feedback_label.setText(error_message)
QMessageBox.warning(self, "Identify Failed", error_message)
else:
mapped_tile = find_tile_for_device(self.working_config, device)
if mapped_tile is not None:
self._selected_assignment_tile_id = mapped_tile.tile_id
self.assignment_feedback_label.setText(
f"{device.ip_address} pulsed successfully. Click the matching tile to store the assignment."
)
self._refresh_network_mapping_ui()
def _handle_mapping_preview_click(self, tile_id: str) -> None:
self._selected_assignment_tile_id = tile_id
active_device = self._active_device()
if active_device is None:
self.assignment_feedback_label.setText(f"{tile_id} selected. Choose a device row or press Identify first.")
self._refresh_network_mapping_ui()
return
self._assign_active_device_to_tile(active_device, tile_id)
def _assign_active_device_to_tile(self, device: DiscoveredWledDevice, tile_id: str) -> None:
tile_lookup = self.working_config.tile_lookup()
target_tile = tile_lookup.get(tile_id)
if target_tile is None:
self.assignment_feedback_label.setText(f"Tile {tile_id} no longer exists in the current mapping.")
return
previous_tile = find_tile_for_device(self.working_config, device)
displaced_tile_text = ""
if target_tile.controller_ip or target_tile.controller_mac:
if not tile_matches_device(target_tile, device):
displaced_tile_text = target_tile.controller_ip or target_tile.controller_name or target_tile.controller_mac
assign_device_to_tile(self.working_config, device, tile_id)
self._selected_assignment_tile_id = tile_id
self._populate_tiles_table()
if not self.raw_editor.document().isModified():
self._set_raw_xml_snapshot()
persisted, persistence_message = self._persist_working_mapping()
summary_parts = [f"{device.ip_address} -> {tile_id}"]
if previous_tile is not None and previous_tile.tile_id != tile_id:
summary_parts.append(f"moved from {previous_tile.tile_id}")
if displaced_tile_text:
summary_parts.append(f"replaced {displaced_tile_text}")
summary = ", ".join(summary_parts)
if persisted:
self.assignment_feedback_label.setText(f"Saved {summary}. {persistence_message}")
else:
self.assignment_feedback_label.setText(f"Stored {summary} in the dialog. {persistence_message}")
QMessageBox.warning(self, "Assignment Not Saved Yet", persistence_message)
self._refresh_network_mapping_ui()
self._select_next_unmapped_device()
def _persist_working_mapping(self) -> tuple[bool, str]:
if self.raw_editor.document().isModified():
return (
False,
"Raw XML has unapplied changes. Apply or discard those edits before assisted assignments can be written to disk.",
)
target_path = None
if self.controller is not None and self.controller.mapping_path is not None:
target_path = self.controller.mapping_path
elif self.working_config.file_path is not None:
target_path = self.working_config.file_path
if target_path is None:
return False, "No mapping file is currently open. Use Save or Save As after closing Mapping Settings."
validation = validate_config(self.working_config)
if not validation.is_valid:
return False, "The mapping is currently invalid and could not be saved: " + "; ".join(validation.errors)
try:
save_config(self.working_config, target_path)
except (MappingValidationError, OSError, ValueError) as exc:
if isinstance(exc, MappingValidationError):
return False, "The mapping could not be saved: " + "; ".join(exc.errors)
return False, f"The mapping file could not be written: {exc}"
if self.controller is not None:
self.controller.replace_config(self.working_config.clone(), path=target_path)
return True, f"Wrote {target_path.name}."
def _select_next_unmapped_device(self) -> None:
for device in self.discovered_devices:
if find_tile_for_device(self.working_config, device) is None:
self._select_device_for_assignment(self._device_key(device))
return
self._refresh_network_mapping_ui()
def _refresh_network_mapping_ui(self) -> None:
active_device = self._active_device()
mapped_count = sum(1 for device in self.discovered_devices if find_tile_for_device(self.working_config, device) is not None)
stale_tile_count = sum(
1
for tile in self.working_config.sorted_tiles()
if (tile.controller_ip.strip() or tile.controller_mac.strip())
and not any(tile_matches_device(tile, device) for device in self.discovered_devices)
)
unmapped_count = max(0, len(self.discovered_devices) - mapped_count)
if not self.discovered_devices:
summary = "No devices discovered yet."
else:
summary = f"{len(self.discovered_devices)} discovered | {mapped_count} mapped | {unmapped_count} unmapped"
if stale_tile_count:
summary += f" | {stale_tile_count} saved assignment(s) not seen in this scan"
self.mapping_summary_label.setText(summary)
if active_device is None:
self.assignment_workflow_label.setText("Select a device row or press Identify, then click the matching tile.")
else:
mapped_tile = find_tile_for_device(self.working_config, active_device)
if mapped_tile is None:
self.assignment_workflow_label.setText(
f"Active device: {active_device.ip_address}. Click the physical tile that just flashed."
)
else:
self.assignment_workflow_label.setText(
f"Active device: {active_device.ip_address} is currently mapped to {mapped_tile.tile_id}. "
"Click a different tile to reassign it."
)
self.mapping_preview.set_assignment_state(
self.working_config,
self.discovered_devices,
active_device=active_device,
selected_tile_id=self._selected_assignment_tile_id,
)
self._refresh_discovered_devices_table()
def _refresh_discovered_devices_table(self) -> None:
self._device_table_refreshing = True
try:
self.discovered_devices_table.setRowCount(len(self.discovered_devices))
for row, device in enumerate(self.discovered_devices):
mapped_tile = find_tile_for_device(self.working_config, device)
status = "Unmapped"
if mapped_tile is not None:
if mapped_tile.controller_ip.strip() and mapped_tile.controller_ip.strip() != device.ip_address:
status = f"Mapped to {mapped_tile.tile_id} (saved IP {mapped_tile.controller_ip})"
else:
status = f"Mapped to {mapped_tile.tile_id}"
values = [
device.ip_address,
device.hostname,
device.instance_name,
device.mac_address,
status,
]
device_key = self._device_key(device)
for column, value in enumerate(values):
item = QTableWidgetItem(value)
item.setData(Qt.UserRole, device_key)
if device_key == self._active_device_key:
item.setBackground(Qt.darkBlue)
item.setForeground(Qt.white)
elif mapped_tile is not None:
item.setBackground(Qt.darkGreen)
self.discovered_devices_table.setItem(row, column, item)
map_button = QPushButton("Map This")
map_button.clicked.connect(lambda _checked=False, key=device_key: self._select_device_for_assignment(key))
self.discovered_devices_table.setCellWidget(row, 5, map_button)
identify_button = QPushButton("Identify")
identify_button.clicked.connect(lambda _checked=False, key=device_key: self._identify_device(key))
self.discovered_devices_table.setCellWidget(row, 6, identify_button)
self.discovered_devices_table.resizeRowsToContents()
if self._active_device_key is not None:
self._select_device_row(self._active_device_key)
finally:
self._device_table_refreshing = False
def _device_for_key(self, device_key: str | None) -> DiscoveredWledDevice | None:
if not device_key:
return None
for device in self.discovered_devices:
if self._device_key(device) == device_key:
return device
return None
def _active_device(self) -> DiscoveredWledDevice | None:
return self._device_for_key(self._active_device_key)
def _device_key(self, device: DiscoveredWledDevice) -> str:
return device.mac_address or device.ip_address
def _text(self, row: int, key: str) -> str:
column = next(index for index, (field, _) in enumerate(self.TILE_COLUMNS) if field == key)
item = self.tiles_table.item(row, column)
return item.text().strip() if item else ""
def _segment_text(self, row: int, key: str) -> str:
column = next(index for index, (field, _) in enumerate(self.SEGMENT_COLUMNS) if field == key)
item = self.segments_table.item(row, column)
return item.text().strip() if item else ""
def _check(self, row: int, key: str) -> bool:
column = next(index for index, (field, _) in enumerate(self.TILE_COLUMNS) if field == key)
item = self.tiles_table.item(row, column)
return bool(item and item.checkState() == Qt.Checked)
def _segment_check(self, row: int, key: str) -> bool:
column = next(index for index, (field, _) in enumerate(self.SEGMENT_COLUMNS) if field == key)
item = self.segments_table.item(row, column)
return bool(item and item.checkState() == Qt.Checked)
def _save_and_accept(self) -> None:
try:
if self.raw_editor.document().isModified():
self.working_config = load_config_from_string(self.raw_editor.toPlainText(), validate=True)
else:
self._sync_tables_to_config()
result = validate_config(self.working_config)
if not result.is_valid:
raise MappingValidationError(result.errors)
except (MappingValidationError, ValueError) as exc:
errors = exc.errors if isinstance(exc, MappingValidationError) else [str(exc)]
self.error_list.clear()
for error in errors:
self.error_list.addItem(error)
self.tabs.setCurrentWidget(self.raw_tab)
QMessageBox.warning(self, "Validation Error", "Please resolve the validation errors before saving.")
return
self.result_config = self.working_config
self.accept()