from __future__ import annotations import threading from app.qt_compat import QComboBox, QDialog, QDialogButtonBox, QFormLayout, QGroupBox, QHBoxLayout, QHeaderView, QLabel, QLineEdit, QListWidget, QMessageBox, QObject, QPushButton, QPlainTextEdit, QSpinBox, QTabWidget, QTableWidget, QTableWidgetItem, Qt, QVBoxLayout, QWidget, Signal from app.config.device_assignment import assign_device_to_tile, find_tile_for_device, tile_matches_device from app.config.models import InfinityMirrorConfig, SegmentConfig from app.config.xml_mapping import MappingValidationError, config_to_xml_string, load_config_from_string, save_config, validate_config from app.network.wled import DiscoveredWledDevice, build_scan_hosts, discover_wled_devices, identify_wled_device from app.ui.mapping_assignment_preview import MappingAssignmentPreview class NetworkScanWorker(QObject): progress = Signal(int, int, object) finished = Signal(object, str, int) def __init__(self, config: InfinityMirrorConfig) -> None: super().__init__() self._config = config.clone() def start(self) -> None: thread = threading.Thread(target=self._run, name="InfinityMirrorNetworkScan", daemon=True) thread.start() def _run(self) -> None: try: hosts = build_scan_hosts(self._config) devices = discover_wled_devices(hosts, progress_callback=self._emit_progress) self.finished.emit(devices, "", len(hosts)) except Exception as exc: self.finished.emit([], str(exc), 0) def _emit_progress(self, completed: int, total: int, device: DiscoveredWledDevice | None) -> None: self.progress.emit(completed, total, device) class DeviceIdentifyWorker(QObject): finished = Signal(object, str) def __init__(self, device: DiscoveredWledDevice) -> None: super().__init__() self._device = device def start(self) -> None: thread = threading.Thread(target=self._run, name=f"WledIdentify-{self._device.ip_address}", daemon=True) thread.start() def _run(self) -> None: try: identify_wled_device(self._device.ip_address, led_count=self._device.led_count) except Exception as exc: self.finished.emit(self._device, str(exc)) return self.finished.emit(self._device, "") class SettingsDialog(QDialog): TILE_COLUMNS = [ ("tile_id", "Tile ID"), ("row", "Row"), ("col", "Col"), ("screen_name", "Screen Name"), ("controller_ip", "Controller IP"), ("controller_name", "Controller Name"), ("controller_host", "Controller Host"), ("controller_mac", "Controller MAC"), ("subnet", "Subnet"), ("universe", "Universe"), ("led_total", "LED Count"), ("brightness_factor", "Brightness"), ("enabled", "Enabled"), ] SEGMENT_COLUMNS = [ ("name", "Name"), ("side", "Side"), ("start_channel", "Start Ch"), ("led_count", "LED Count"), ("orientation_rad", "Orientation"), ("x0", "x0"), ("y0", "y0"), ("x1", "x1"), ("y1", "y1"), ("reverse", "Reverse"), ] DEVICE_COLUMNS = [ "IP Address", "Host / mDNS", "WLED Name", "MAC Address", "Status", "Map", "Identify", ] def __init__(self, config: InfinityMirrorConfig, controller=None, parent: QWidget | None = None) -> None: super().__init__(parent) self.setWindowTitle("Mapping Settings") self.resize(1320, 900) self.controller = controller self.working_config = config.clone() self.result_config: InfinityMirrorConfig | None = None self._active_segment_tile_id: str | None = None self._device_table_refreshing = False self._active_device_key: str | None = None self._selected_assignment_tile_id: str | None = None self._scan_worker: NetworkScanWorker | None = None self._identify_workers: set[DeviceIdentifyWorker] = set() self.discovered_devices: list[DiscoveredWledDevice] = [] self._last_scan_host_count = 0 root_layout = QVBoxLayout(self) header_group = QGroupBox("Mapping") header_form = QFormLayout(header_group) self.name_edit = QLineEdit(self.working_config.name) self.rows_spin = QSpinBox() self.rows_spin.setRange(1, 24) self.rows_spin.setValue(self.working_config.logical_display.rows) self.cols_spin = QSpinBox() self.cols_spin.setRange(1, 24) self.cols_spin.setValue(self.working_config.logical_display.cols) header_form.addRow("Name", self.name_edit) header_form.addRow("Rows", self.rows_spin) header_form.addRow("Cols", self.cols_spin) root_layout.addWidget(header_group) self.tabs = QTabWidget() self.tabs.currentChanged.connect(self._handle_tab_changed) root_layout.addWidget(self.tabs, 1) self.tiles_tab = QWidget() self.network_tab = QWidget() self.segments_tab = QWidget() self.raw_tab = QWidget() self.tabs.addTab(self.tiles_tab, "Tiles") self.tabs.addTab(self.network_tab, "Network Mapping") self.tabs.addTab(self.segments_tab, "Segments") self.tabs.addTab(self.raw_tab, "Raw XML") self._build_tiles_tab() self._build_network_tab() self._build_segments_tab() self._build_raw_tab() self._populate_tiles_table() self._rebuild_segment_tile_combo() self._refresh_raw_xml() self._refresh_network_mapping_ui() button_box = QDialogButtonBox(QDialogButtonBox.Save | QDialogButtonBox.Cancel) button_box.accepted.connect(self._save_and_accept) button_box.rejected.connect(self.reject) root_layout.addWidget(button_box) def _build_tiles_tab(self) -> None: layout = QVBoxLayout(self.tiles_tab) self.tiles_table = QTableWidget(0, len(self.TILE_COLUMNS)) self.tiles_table.setHorizontalHeaderLabels([label for _, label in self.TILE_COLUMNS]) self.tiles_table.verticalHeader().setVisible(False) self.tiles_table.setAlternatingRowColors(True) self.tiles_table.setSelectionBehavior(QTableWidget.SelectRows) self.tiles_table.setSortingEnabled(False) layout.addWidget(self.tiles_table) def _build_network_tab(self) -> None: layout = QVBoxLayout(self.network_tab) layout.setSpacing(12) intro = QLabel( "Scan the local network for WLED controllers, flash one device at a time, " "then click the matching physical tile to save the assignment." ) intro.setWordWrap(True) layout.addWidget(intro) hint = QLabel( "Assignments save immediately to the open mapping file when the current mapping validates. " "If the mapping cannot be saved yet, the assignment stays in this dialog and you will see a warning." ) hint.setWordWrap(True) hint.setStyleSheet("color: #A8B3C7;") layout.addWidget(hint) content_row = QHBoxLayout() content_row.setSpacing(12) layout.addLayout(content_row, 1) preview_group = QGroupBox("Mapping Preview") preview_layout = QVBoxLayout(preview_group) preview_layout.setSpacing(8) self.assignment_workflow_label = QLabel("Scan the network to begin assisted mapping.") self.assignment_workflow_label.setWordWrap(True) preview_layout.addWidget(self.assignment_workflow_label) self.mapping_preview = MappingAssignmentPreview() self.mapping_preview.tileClicked.connect(self._handle_mapping_preview_click) preview_layout.addWidget(self.mapping_preview, 1) content_row.addWidget(preview_group, 1) device_group = QGroupBox("Discovered WLED Devices") device_layout = QVBoxLayout(device_group) device_layout.setSpacing(8) scan_row = QHBoxLayout() self.scan_button = QPushButton("Scan Network") self.scan_button.clicked.connect(self._scan_network) scan_row.addWidget(self.scan_button) self.scan_status_label = QLabel("Not scanned yet.") self.scan_status_label.setWordWrap(True) scan_row.addWidget(self.scan_status_label, 1) device_layout.addLayout(scan_row) self.mapping_summary_label = QLabel("No devices discovered.") self.mapping_summary_label.setWordWrap(True) self.mapping_summary_label.setStyleSheet("color: #C7D2E1;") device_layout.addWidget(self.mapping_summary_label) self.discovered_devices_table = QTableWidget(0, len(self.DEVICE_COLUMNS)) self.discovered_devices_table.setHorizontalHeaderLabels(self.DEVICE_COLUMNS) self.discovered_devices_table.verticalHeader().setVisible(False) self.discovered_devices_table.setAlternatingRowColors(True) self.discovered_devices_table.setSelectionBehavior(QTableWidget.SelectRows) self.discovered_devices_table.setSortingEnabled(False) self.discovered_devices_table.itemSelectionChanged.connect(self._on_discovered_device_selection_changed) header = self.discovered_devices_table.horizontalHeader() header.setSectionResizeMode(0, QHeaderView.ResizeToContents) header.setSectionResizeMode(1, QHeaderView.ResizeToContents) header.setSectionResizeMode(2, QHeaderView.ResizeToContents) header.setSectionResizeMode(3, QHeaderView.ResizeToContents) header.setSectionResizeMode(4, QHeaderView.Stretch) header.setSectionResizeMode(5, QHeaderView.ResizeToContents) header.setSectionResizeMode(6, QHeaderView.ResizeToContents) device_layout.addWidget(self.discovered_devices_table, 1) self.assignment_feedback_label = QLabel("Select a device or press Identify, then click the matching tile.") self.assignment_feedback_label.setWordWrap(True) self.assignment_feedback_label.setStyleSheet("color: #A8B3C7;") device_layout.addWidget(self.assignment_feedback_label) content_row.addWidget(device_group, 1) def _build_segments_tab(self) -> None: layout = QVBoxLayout(self.segments_tab) top_row = QHBoxLayout() top_row.addWidget(QLabel("Tile")) self.segment_tile_combo = QComboBox() self.segment_tile_combo.currentIndexChanged.connect(self._on_segment_tile_changed) top_row.addWidget(self.segment_tile_combo, 1) self.add_segment_button = QPushButton("Add Segment") self.remove_segment_button = QPushButton("Remove Selected") top_row.addWidget(self.add_segment_button) top_row.addWidget(self.remove_segment_button) layout.addLayout(top_row) self.segments_table = QTableWidget(0, len(self.SEGMENT_COLUMNS)) self.segments_table.setHorizontalHeaderLabels([label for _, label in self.SEGMENT_COLUMNS]) self.segments_table.verticalHeader().setVisible(False) self.segments_table.setAlternatingRowColors(True) self.segments_table.setSelectionBehavior(QTableWidget.SelectRows) layout.addWidget(self.segments_table, 1) self.add_segment_button.clicked.connect(self._add_segment_row) self.remove_segment_button.clicked.connect(self._remove_selected_segments) def _build_raw_tab(self) -> None: layout = QVBoxLayout(self.raw_tab) button_row = QHBoxLayout() self.refresh_raw_button = QPushButton("Refresh From Tables") self.validate_raw_button = QPushButton("Validate XML") self.apply_raw_button = QPushButton("Apply XML To Tables") button_row.addWidget(self.refresh_raw_button) button_row.addWidget(self.validate_raw_button) button_row.addWidget(self.apply_raw_button) button_row.addStretch(1) layout.addLayout(button_row) self.raw_editor = QPlainTextEdit() self.raw_editor.setLineWrapMode(QPlainTextEdit.NoWrap) self.raw_editor.document().setModified(False) layout.addWidget(self.raw_editor, 1) layout.addWidget(QLabel("Validation")) self.error_list = QListWidget() layout.addWidget(self.error_list) self.refresh_raw_button.clicked.connect(self._refresh_raw_xml) self.validate_raw_button.clicked.connect(self._validate_raw_xml) self.apply_raw_button.clicked.connect(self._apply_raw_xml) def _populate_tiles_table(self) -> None: self.tiles_table.setRowCount(len(self.working_config.tiles)) for row, tile in enumerate(self.working_config.sorted_tiles()): values = { "tile_id": tile.tile_id, "row": str(tile.row), "col": str(tile.col), "screen_name": tile.screen_name, "controller_ip": tile.controller_ip, "controller_name": tile.controller_name, "controller_host": tile.controller_host, "controller_mac": tile.controller_mac, "subnet": str(tile.subnet), "universe": str(tile.universe), "led_total": str(tile.led_total), "brightness_factor": f"{tile.brightness_factor:.3f}", "enabled": tile.enabled, } for column, (key, _) in enumerate(self.TILE_COLUMNS): if key == "enabled": item = QTableWidgetItem() item.setFlags(item.flags() | Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsSelectable) item.setCheckState(Qt.Checked if tile.enabled else Qt.Unchecked) else: item = QTableWidgetItem(str(values[key])) self.tiles_table.setItem(row, column, item) self.tiles_table.resizeColumnsToContents() def _rebuild_segment_tile_combo(self) -> None: self.segment_tile_combo.blockSignals(True) self.segment_tile_combo.clear() for tile in self.working_config.sorted_tiles(): self.segment_tile_combo.addItem(tile.tile_id, tile.tile_id) current_tile_id = self._active_segment_tile_id or (self.working_config.sorted_tiles()[0].tile_id if self.working_config.tiles else None) if current_tile_id: index = self.segment_tile_combo.findData(current_tile_id) self.segment_tile_combo.setCurrentIndex(max(0, index)) self._active_segment_tile_id = self.segment_tile_combo.currentData() self.segment_tile_combo.blockSignals(False) self._populate_segments_table() def _populate_segments_table(self) -> None: tile = self._active_tile_for_segments() self.segments_table.setRowCount(0) if tile is None: return self.segments_table.setRowCount(len(tile.segments)) for row, segment in enumerate(tile.segments): values = { "name": segment.name, "side": segment.side, "start_channel": str(segment.start_channel), "led_count": str(segment.led_count), "orientation_rad": f"{segment.orientation_rad:.5f}", "x0": f"{segment.x0:.3f}", "y0": f"{segment.y0:.3f}", "x1": f"{segment.x1:.3f}", "y1": f"{segment.y1:.3f}", "reverse": segment.reverse, } for column, (key, _) in enumerate(self.SEGMENT_COLUMNS): if key == "reverse": item = QTableWidgetItem() item.setFlags(item.flags() | Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsSelectable) item.setCheckState(Qt.Checked if segment.reverse else Qt.Unchecked) else: item = QTableWidgetItem(str(values[key])) self.segments_table.setItem(row, column, item) self.segments_table.resizeColumnsToContents() def _active_tile_for_segments(self): return self.working_config.tile_lookup().get(self.segment_tile_combo.currentData()) def _on_segment_tile_changed(self) -> None: previous_tile_id = self._active_segment_tile_id self._sync_segments_table(previous_tile_id) self._active_segment_tile_id = self.segment_tile_combo.currentData() self._populate_segments_table() def _add_segment_row(self) -> None: row = self.segments_table.rowCount() self.segments_table.insertRow(row) defaults = ["New Segment", "left", "1", "1", "0.0", "0.0", "0.0", "0.0", "0.0", False] for column, (_, _) in enumerate(self.SEGMENT_COLUMNS): value = defaults[column] if column == len(self.SEGMENT_COLUMNS) - 1: item = QTableWidgetItem() item.setFlags(item.flags() | Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsSelectable) item.setCheckState(Qt.Unchecked) else: item = QTableWidgetItem(str(value)) self.segments_table.setItem(row, column, item) def _remove_selected_segments(self) -> None: rows = sorted({item.row() for item in self.segments_table.selectedItems()}, reverse=True) for row in rows: self.segments_table.removeRow(row) def _refresh_raw_xml(self) -> None: self._sync_tables_to_config() self._set_raw_xml_snapshot() self.error_list.clear() def _set_raw_xml_snapshot(self) -> None: self.raw_editor.setPlainText(config_to_xml_string(self.working_config)) self.raw_editor.document().setModified(False) def _validate_raw_xml(self) -> None: self.error_list.clear() try: config = load_config_from_string(self.raw_editor.toPlainText(), validate=True) self.error_list.addItem(f"XML valid. {len(config.tiles)} tiles loaded.") except MappingValidationError as exc: for error in exc.errors: self.error_list.addItem(error) def _apply_raw_xml(self) -> None: self.error_list.clear() try: self.working_config = load_config_from_string(self.raw_editor.toPlainText(), validate=True) self.name_edit.setText(self.working_config.name) self.rows_spin.setValue(self.working_config.logical_display.rows) self.cols_spin.setValue(self.working_config.logical_display.cols) self._populate_tiles_table() self._rebuild_segment_tile_combo() self._refresh_network_mapping_ui() self.raw_editor.document().setModified(False) self.error_list.addItem("Applied XML to editable tables.") except MappingValidationError as exc: for error in exc.errors: self.error_list.addItem(error) def _handle_tab_changed(self, index: int) -> None: try: if self.tabs.widget(index) is self.network_tab: self._sync_general_fields() self._sync_tiles_table() self._sync_segments_table() self._refresh_network_mapping_ui() elif self.tabs.widget(index) is self.segments_tab: self._sync_tiles_table() self._rebuild_segment_tile_combo() elif self.tabs.widget(index) is self.raw_tab and not self.raw_editor.document().isModified(): self._refresh_raw_xml() except (MappingValidationError, ValueError, KeyError, StopIteration) as exc: self.tabs.blockSignals(True) self.tabs.setCurrentWidget(self.tiles_tab) self.tabs.blockSignals(False) QMessageBox.warning( self, "Mapping Error", f"Please resolve the current tile or segment values before switching tabs.\n\n{exc}", ) def _sync_tables_to_config(self) -> None: self._sync_general_fields() self._sync_tiles_table() self._sync_segments_table() def _sync_general_fields(self) -> None: self.working_config.name = self.name_edit.text().strip() or self.working_config.name self.working_config.logical_display.rows = self.rows_spin.value() self.working_config.logical_display.cols = self.cols_spin.value() def _sync_tiles_table(self) -> None: tiles = self.working_config.sorted_tiles() for row, tile in enumerate(tiles): tile.tile_id = self._text(row, "tile_id") tile.row = int(self._text(row, "row")) tile.col = int(self._text(row, "col")) tile.screen_name = self._text(row, "screen_name") tile.controller_ip = self._text(row, "controller_ip") tile.controller_name = self._text(row, "controller_name") tile.controller_host = self._text(row, "controller_host") tile.controller_mac = self._text(row, "controller_mac") tile.subnet = int(self._text(row, "subnet")) tile.universe = int(self._text(row, "universe")) tile.led_total = int(self._text(row, "led_total")) tile.brightness_factor = float(self._text(row, "brightness_factor")) tile.enabled = self._check(row, "enabled") self.working_config.tiles = tiles def _sync_segments_table(self, tile_id: str | None = None) -> None: lookup_id = tile_id if tile_id is not None else self._active_segment_tile_id or self.segment_tile_combo.currentData() tile = self.working_config.tile_lookup().get(lookup_id) if tile is None: return segments: list[SegmentConfig] = [] for row in range(self.segments_table.rowCount()): segments.append( SegmentConfig( name=self._segment_text(row, "name"), side=self._segment_text(row, "side"), start_channel=int(self._segment_text(row, "start_channel")), led_count=int(self._segment_text(row, "led_count")), orientation_rad=float(self._segment_text(row, "orientation_rad")), x0=float(self._segment_text(row, "x0")), y0=float(self._segment_text(row, "y0")), x1=float(self._segment_text(row, "x1")), y1=float(self._segment_text(row, "y1")), reverse=self._segment_check(row, "reverse"), ) ) tile.segments = segments def _scan_network(self) -> None: self._sync_general_fields() self._sync_tiles_table() self._sync_segments_table() self.discovered_devices = [] self._active_device_key = None self._selected_assignment_tile_id = None self._scan_worker = NetworkScanWorker(self.working_config) self._scan_worker.progress.connect(self._on_scan_progress) self._scan_worker.finished.connect(self._on_scan_finished) self.scan_button.setEnabled(False) self.scan_status_label.setText("Scanning local subnets for WLED devices...") self.assignment_feedback_label.setText("Scanning the network. Results will appear below as devices respond.") self._refresh_network_mapping_ui() self._scan_worker.start() def _on_scan_progress(self, completed: int, total: int, device: DiscoveredWledDevice | None) -> None: self.scan_status_label.setText(f"Scanning {completed}/{total} hosts...") if device is None: return if any(self._device_key(existing) == self._device_key(device) for existing in self.discovered_devices): return self.discovered_devices.append(device) self.discovered_devices.sort(key=lambda item: tuple(int(part) for part in item.ip_address.split("."))) if self._active_device() is None: self._active_device_key = self._device_key(self.discovered_devices[0]) self._refresh_network_mapping_ui() def _on_scan_finished(self, devices: list[DiscoveredWledDevice], error_message: str, host_count: int) -> None: self.scan_button.setEnabled(True) self._scan_worker = None self._last_scan_host_count = host_count if error_message: self.scan_status_label.setText("Network scan failed.") self.assignment_feedback_label.setText(error_message) QMessageBox.warning(self, "Network Scan Error", error_message) return self.discovered_devices = list(devices) if self._active_device() is None and self.discovered_devices: self._active_device_key = self._device_key(self.discovered_devices[0]) if not self.discovered_devices: self.scan_status_label.setText(f"No WLED devices found after scanning {host_count} hosts.") self.assignment_feedback_label.setText("No WLED devices were detected. Check the subnet, power, and network connectivity.") else: self.scan_status_label.setText(f"Found {len(self.discovered_devices)} WLED device(s) across {host_count} hosts.") self._refresh_network_mapping_ui() def _on_discovered_device_selection_changed(self) -> None: if self._device_table_refreshing: return row = self.discovered_devices_table.currentRow() if row < 0: return item = self.discovered_devices_table.item(row, 0) if item is None: return device_key = item.data(Qt.UserRole) if isinstance(device_key, str): self._select_device_for_assignment(device_key, select_table=False) def _select_device_for_assignment(self, device_key: str, *, select_table: bool = True) -> None: self._active_device_key = device_key active_device = self._active_device() mapped_tile = find_tile_for_device(self.working_config, active_device) if active_device is not None else None self._selected_assignment_tile_id = mapped_tile.tile_id if mapped_tile is not None else self._selected_assignment_tile_id if select_table: self._select_device_row(device_key) self._refresh_network_mapping_ui() def _select_device_row(self, device_key: str) -> None: for row in range(self.discovered_devices_table.rowCount()): item = self.discovered_devices_table.item(row, 0) if item is None: continue if item.data(Qt.UserRole) == device_key: self.discovered_devices_table.selectRow(row) return def _identify_device(self, device_key: str) -> None: device = self._device_for_key(device_key) if device is None: return self._select_device_for_assignment(device_key) worker = DeviceIdentifyWorker(device) self._identify_workers.add(worker) worker.finished.connect(self._on_identify_finished) self.assignment_feedback_label.setText( f"Identifying {device.ip_address}. Watch for the red pulse, then click the matching tile." ) worker.start() def _on_identify_finished(self, device: DiscoveredWledDevice, error_message: str) -> None: for worker in list(self._identify_workers): if worker._device == device: self._identify_workers.discard(worker) break if error_message: self.assignment_feedback_label.setText(error_message) QMessageBox.warning(self, "Identify Failed", error_message) else: mapped_tile = find_tile_for_device(self.working_config, device) if mapped_tile is not None: self._selected_assignment_tile_id = mapped_tile.tile_id self.assignment_feedback_label.setText( f"{device.ip_address} pulsed successfully. Click the matching tile to store the assignment." ) self._refresh_network_mapping_ui() def _handle_mapping_preview_click(self, tile_id: str) -> None: self._selected_assignment_tile_id = tile_id active_device = self._active_device() if active_device is None: self.assignment_feedback_label.setText(f"{tile_id} selected. Choose a device row or press Identify first.") self._refresh_network_mapping_ui() return self._assign_active_device_to_tile(active_device, tile_id) def _assign_active_device_to_tile(self, device: DiscoveredWledDevice, tile_id: str) -> None: tile_lookup = self.working_config.tile_lookup() target_tile = tile_lookup.get(tile_id) if target_tile is None: self.assignment_feedback_label.setText(f"Tile {tile_id} no longer exists in the current mapping.") return previous_tile = find_tile_for_device(self.working_config, device) displaced_tile_text = "" if target_tile.controller_ip or target_tile.controller_mac: if not tile_matches_device(target_tile, device): displaced_tile_text = target_tile.controller_ip or target_tile.controller_name or target_tile.controller_mac assign_device_to_tile(self.working_config, device, tile_id) self._selected_assignment_tile_id = tile_id self._populate_tiles_table() if not self.raw_editor.document().isModified(): self._set_raw_xml_snapshot() persisted, persistence_message = self._persist_working_mapping() summary_parts = [f"{device.ip_address} -> {tile_id}"] if previous_tile is not None and previous_tile.tile_id != tile_id: summary_parts.append(f"moved from {previous_tile.tile_id}") if displaced_tile_text: summary_parts.append(f"replaced {displaced_tile_text}") summary = ", ".join(summary_parts) if persisted: self.assignment_feedback_label.setText(f"Saved {summary}. {persistence_message}") else: self.assignment_feedback_label.setText(f"Stored {summary} in the dialog. {persistence_message}") QMessageBox.warning(self, "Assignment Not Saved Yet", persistence_message) self._refresh_network_mapping_ui() self._select_next_unmapped_device() def _persist_working_mapping(self) -> tuple[bool, str]: if self.raw_editor.document().isModified(): return ( False, "Raw XML has unapplied changes. Apply or discard those edits before assisted assignments can be written to disk.", ) target_path = None if self.controller is not None and self.controller.mapping_path is not None: target_path = self.controller.mapping_path elif self.working_config.file_path is not None: target_path = self.working_config.file_path if target_path is None: return False, "No mapping file is currently open. Use Save or Save As after closing Mapping Settings." validation = validate_config(self.working_config) if not validation.is_valid: return False, "The mapping is currently invalid and could not be saved: " + "; ".join(validation.errors) try: save_config(self.working_config, target_path) except (MappingValidationError, OSError, ValueError) as exc: if isinstance(exc, MappingValidationError): return False, "The mapping could not be saved: " + "; ".join(exc.errors) return False, f"The mapping file could not be written: {exc}" if self.controller is not None: self.controller.replace_config(self.working_config.clone(), path=target_path) return True, f"Wrote {target_path.name}." def _select_next_unmapped_device(self) -> None: for device in self.discovered_devices: if find_tile_for_device(self.working_config, device) is None: self._select_device_for_assignment(self._device_key(device)) return self._refresh_network_mapping_ui() def _refresh_network_mapping_ui(self) -> None: active_device = self._active_device() mapped_count = sum(1 for device in self.discovered_devices if find_tile_for_device(self.working_config, device) is not None) stale_tile_count = sum( 1 for tile in self.working_config.sorted_tiles() if (tile.controller_ip.strip() or tile.controller_mac.strip()) and not any(tile_matches_device(tile, device) for device in self.discovered_devices) ) unmapped_count = max(0, len(self.discovered_devices) - mapped_count) if not self.discovered_devices: summary = "No devices discovered yet." else: summary = f"{len(self.discovered_devices)} discovered | {mapped_count} mapped | {unmapped_count} unmapped" if stale_tile_count: summary += f" | {stale_tile_count} saved assignment(s) not seen in this scan" self.mapping_summary_label.setText(summary) if active_device is None: self.assignment_workflow_label.setText("Select a device row or press Identify, then click the matching tile.") else: mapped_tile = find_tile_for_device(self.working_config, active_device) if mapped_tile is None: self.assignment_workflow_label.setText( f"Active device: {active_device.ip_address}. Click the physical tile that just flashed." ) else: self.assignment_workflow_label.setText( f"Active device: {active_device.ip_address} is currently mapped to {mapped_tile.tile_id}. " "Click a different tile to reassign it." ) self.mapping_preview.set_assignment_state( self.working_config, self.discovered_devices, active_device=active_device, selected_tile_id=self._selected_assignment_tile_id, ) self._refresh_discovered_devices_table() def _refresh_discovered_devices_table(self) -> None: self._device_table_refreshing = True try: self.discovered_devices_table.setRowCount(len(self.discovered_devices)) for row, device in enumerate(self.discovered_devices): mapped_tile = find_tile_for_device(self.working_config, device) status = "Unmapped" if mapped_tile is not None: if mapped_tile.controller_ip.strip() and mapped_tile.controller_ip.strip() != device.ip_address: status = f"Mapped to {mapped_tile.tile_id} (saved IP {mapped_tile.controller_ip})" else: status = f"Mapped to {mapped_tile.tile_id}" values = [ device.ip_address, device.hostname, device.instance_name, device.mac_address, status, ] device_key = self._device_key(device) for column, value in enumerate(values): item = QTableWidgetItem(value) item.setData(Qt.UserRole, device_key) if device_key == self._active_device_key: item.setBackground(Qt.darkBlue) item.setForeground(Qt.white) elif mapped_tile is not None: item.setBackground(Qt.darkGreen) self.discovered_devices_table.setItem(row, column, item) map_button = QPushButton("Map This") map_button.clicked.connect(lambda _checked=False, key=device_key: self._select_device_for_assignment(key)) self.discovered_devices_table.setCellWidget(row, 5, map_button) identify_button = QPushButton("Identify") identify_button.clicked.connect(lambda _checked=False, key=device_key: self._identify_device(key)) self.discovered_devices_table.setCellWidget(row, 6, identify_button) self.discovered_devices_table.resizeRowsToContents() if self._active_device_key is not None: self._select_device_row(self._active_device_key) finally: self._device_table_refreshing = False def _device_for_key(self, device_key: str | None) -> DiscoveredWledDevice | None: if not device_key: return None for device in self.discovered_devices: if self._device_key(device) == device_key: return device return None def _active_device(self) -> DiscoveredWledDevice | None: return self._device_for_key(self._active_device_key) def _device_key(self, device: DiscoveredWledDevice) -> str: return device.mac_address or device.ip_address def _text(self, row: int, key: str) -> str: column = next(index for index, (field, _) in enumerate(self.TILE_COLUMNS) if field == key) item = self.tiles_table.item(row, column) return item.text().strip() if item else "" def _segment_text(self, row: int, key: str) -> str: column = next(index for index, (field, _) in enumerate(self.SEGMENT_COLUMNS) if field == key) item = self.segments_table.item(row, column) return item.text().strip() if item else "" def _check(self, row: int, key: str) -> bool: column = next(index for index, (field, _) in enumerate(self.TILE_COLUMNS) if field == key) item = self.tiles_table.item(row, column) return bool(item and item.checkState() == Qt.Checked) def _segment_check(self, row: int, key: str) -> bool: column = next(index for index, (field, _) in enumerate(self.SEGMENT_COLUMNS) if field == key) item = self.segments_table.item(row, column) return bool(item and item.checkState() == Qt.Checked) def _save_and_accept(self) -> None: try: if self.raw_editor.document().isModified(): self.working_config = load_config_from_string(self.raw_editor.toPlainText(), validate=True) else: self._sync_tables_to_config() result = validate_config(self.working_config) if not result.is_valid: raise MappingValidationError(result.errors) except (MappingValidationError, ValueError) as exc: errors = exc.errors if isinstance(exc, MappingValidationError) else [str(exc)] self.error_list.clear() for error in errors: self.error_list.addItem(error) self.tabs.setCurrentWidget(self.raw_tab) QMessageBox.warning(self, "Validation Error", "Please resolve the validation errors before saving.") return self.result_config = self.working_config self.accept()