834 lines
38 KiB
Python
834 lines
38 KiB
Python
from __future__ import annotations
|
|
|
|
import threading
|
|
|
|
from app.qt_compat import QComboBox, QDialog, QDialogButtonBox, QFormLayout, QGroupBox, QHBoxLayout, QHeaderView, QLabel, QLineEdit, QListWidget, QMessageBox, QObject, QPushButton, QPlainTextEdit, QSpinBox, QTabWidget, QTableWidget, QTableWidgetItem, Qt, QVBoxLayout, QWidget, Signal
|
|
|
|
from app.config.device_assignment import assign_device_to_tile, find_tile_for_device, tile_matches_device
|
|
from app.config.models import InfinityMirrorConfig, SegmentConfig
|
|
from app.config.xml_mapping import MappingValidationError, config_to_xml_string, load_config_from_string, save_config, validate_config
|
|
from app.network.wled import DiscoveredWledDevice, build_scan_hosts, discover_wled_devices, identify_wled_device
|
|
from app.ui.mapping_assignment_preview import MappingAssignmentPreview
|
|
|
|
|
|
class NetworkScanWorker(QObject):
|
|
progress = Signal(int, int, object)
|
|
finished = Signal(object, str, int)
|
|
|
|
def __init__(self, config: InfinityMirrorConfig) -> None:
|
|
super().__init__()
|
|
self._config = config.clone()
|
|
|
|
def start(self) -> None:
|
|
thread = threading.Thread(target=self._run, name="InfinityMirrorNetworkScan", daemon=True)
|
|
thread.start()
|
|
|
|
def _run(self) -> None:
|
|
try:
|
|
hosts = build_scan_hosts(self._config)
|
|
devices = discover_wled_devices(hosts, progress_callback=self._emit_progress)
|
|
self.finished.emit(devices, "", len(hosts))
|
|
except Exception as exc:
|
|
self.finished.emit([], str(exc), 0)
|
|
|
|
def _emit_progress(self, completed: int, total: int, device: DiscoveredWledDevice | None) -> None:
|
|
self.progress.emit(completed, total, device)
|
|
|
|
|
|
class DeviceIdentifyWorker(QObject):
|
|
finished = Signal(object, str)
|
|
|
|
def __init__(self, device: DiscoveredWledDevice) -> None:
|
|
super().__init__()
|
|
self._device = device
|
|
|
|
def start(self) -> None:
|
|
thread = threading.Thread(target=self._run, name=f"WledIdentify-{self._device.ip_address}", daemon=True)
|
|
thread.start()
|
|
|
|
def _run(self) -> None:
|
|
try:
|
|
identify_wled_device(self._device.ip_address, led_count=self._device.led_count)
|
|
except Exception as exc:
|
|
self.finished.emit(self._device, str(exc))
|
|
return
|
|
self.finished.emit(self._device, "")
|
|
|
|
|
|
class SettingsDialog(QDialog):
|
|
TILE_COLUMNS = [
|
|
("tile_id", "Tile ID"),
|
|
("row", "Row"),
|
|
("col", "Col"),
|
|
("screen_name", "Screen Name"),
|
|
("controller_ip", "Controller IP"),
|
|
("controller_name", "Controller Name"),
|
|
("controller_host", "Controller Host"),
|
|
("controller_mac", "Controller MAC"),
|
|
("subnet", "Subnet"),
|
|
("universe", "Universe"),
|
|
("led_total", "LED Count"),
|
|
("brightness_factor", "Brightness"),
|
|
("enabled", "Enabled"),
|
|
]
|
|
SEGMENT_COLUMNS = [
|
|
("name", "Name"),
|
|
("side", "Side"),
|
|
("start_channel", "Start Ch"),
|
|
("led_count", "LED Count"),
|
|
("orientation_rad", "Orientation"),
|
|
("x0", "x0"),
|
|
("y0", "y0"),
|
|
("x1", "x1"),
|
|
("y1", "y1"),
|
|
("reverse", "Reverse"),
|
|
]
|
|
DEVICE_COLUMNS = [
|
|
"IP Address",
|
|
"Host / mDNS",
|
|
"WLED Name",
|
|
"MAC Address",
|
|
"Status",
|
|
"Map",
|
|
"Identify",
|
|
]
|
|
|
|
def __init__(self, config: InfinityMirrorConfig, controller=None, parent: QWidget | None = None) -> None:
|
|
super().__init__(parent)
|
|
self.setWindowTitle("Mapping Settings")
|
|
self.resize(1320, 900)
|
|
|
|
self.controller = controller
|
|
self.working_config = config.clone()
|
|
self.result_config: InfinityMirrorConfig | None = None
|
|
self._active_segment_tile_id: str | None = None
|
|
self._device_table_refreshing = False
|
|
self._active_device_key: str | None = None
|
|
self._selected_assignment_tile_id: str | None = None
|
|
self._scan_worker: NetworkScanWorker | None = None
|
|
self._identify_workers: set[DeviceIdentifyWorker] = set()
|
|
self.discovered_devices: list[DiscoveredWledDevice] = []
|
|
self._last_scan_host_count = 0
|
|
|
|
root_layout = QVBoxLayout(self)
|
|
|
|
header_group = QGroupBox("Mapping")
|
|
header_form = QFormLayout(header_group)
|
|
self.name_edit = QLineEdit(self.working_config.name)
|
|
self.rows_spin = QSpinBox()
|
|
self.rows_spin.setRange(1, 24)
|
|
self.rows_spin.setValue(self.working_config.logical_display.rows)
|
|
self.cols_spin = QSpinBox()
|
|
self.cols_spin.setRange(1, 24)
|
|
self.cols_spin.setValue(self.working_config.logical_display.cols)
|
|
header_form.addRow("Name", self.name_edit)
|
|
header_form.addRow("Rows", self.rows_spin)
|
|
header_form.addRow("Cols", self.cols_spin)
|
|
root_layout.addWidget(header_group)
|
|
|
|
self.tabs = QTabWidget()
|
|
self.tabs.currentChanged.connect(self._handle_tab_changed)
|
|
root_layout.addWidget(self.tabs, 1)
|
|
|
|
self.tiles_tab = QWidget()
|
|
self.network_tab = QWidget()
|
|
self.segments_tab = QWidget()
|
|
self.raw_tab = QWidget()
|
|
self.tabs.addTab(self.tiles_tab, "Tiles")
|
|
self.tabs.addTab(self.network_tab, "Network Mapping")
|
|
self.tabs.addTab(self.segments_tab, "Segments")
|
|
self.tabs.addTab(self.raw_tab, "Raw XML")
|
|
|
|
self._build_tiles_tab()
|
|
self._build_network_tab()
|
|
self._build_segments_tab()
|
|
self._build_raw_tab()
|
|
self._populate_tiles_table()
|
|
self._rebuild_segment_tile_combo()
|
|
self._refresh_raw_xml()
|
|
self._refresh_network_mapping_ui()
|
|
|
|
button_box = QDialogButtonBox(QDialogButtonBox.Save | QDialogButtonBox.Cancel)
|
|
button_box.accepted.connect(self._save_and_accept)
|
|
button_box.rejected.connect(self.reject)
|
|
root_layout.addWidget(button_box)
|
|
|
|
def _build_tiles_tab(self) -> None:
|
|
layout = QVBoxLayout(self.tiles_tab)
|
|
self.tiles_table = QTableWidget(0, len(self.TILE_COLUMNS))
|
|
self.tiles_table.setHorizontalHeaderLabels([label for _, label in self.TILE_COLUMNS])
|
|
self.tiles_table.verticalHeader().setVisible(False)
|
|
self.tiles_table.setAlternatingRowColors(True)
|
|
self.tiles_table.setSelectionBehavior(QTableWidget.SelectRows)
|
|
self.tiles_table.setSortingEnabled(False)
|
|
layout.addWidget(self.tiles_table)
|
|
|
|
def _build_network_tab(self) -> None:
|
|
layout = QVBoxLayout(self.network_tab)
|
|
layout.setSpacing(12)
|
|
|
|
intro = QLabel(
|
|
"Scan the local network for WLED controllers, flash one device at a time, "
|
|
"then click the matching physical tile to save the assignment."
|
|
)
|
|
intro.setWordWrap(True)
|
|
layout.addWidget(intro)
|
|
|
|
hint = QLabel(
|
|
"Assignments save immediately to the open mapping file when the current mapping validates. "
|
|
"If the mapping cannot be saved yet, the assignment stays in this dialog and you will see a warning."
|
|
)
|
|
hint.setWordWrap(True)
|
|
hint.setStyleSheet("color: #A8B3C7;")
|
|
layout.addWidget(hint)
|
|
|
|
content_row = QHBoxLayout()
|
|
content_row.setSpacing(12)
|
|
layout.addLayout(content_row, 1)
|
|
|
|
preview_group = QGroupBox("Mapping Preview")
|
|
preview_layout = QVBoxLayout(preview_group)
|
|
preview_layout.setSpacing(8)
|
|
self.assignment_workflow_label = QLabel("Scan the network to begin assisted mapping.")
|
|
self.assignment_workflow_label.setWordWrap(True)
|
|
preview_layout.addWidget(self.assignment_workflow_label)
|
|
self.mapping_preview = MappingAssignmentPreview()
|
|
self.mapping_preview.tileClicked.connect(self._handle_mapping_preview_click)
|
|
preview_layout.addWidget(self.mapping_preview, 1)
|
|
content_row.addWidget(preview_group, 1)
|
|
|
|
device_group = QGroupBox("Discovered WLED Devices")
|
|
device_layout = QVBoxLayout(device_group)
|
|
device_layout.setSpacing(8)
|
|
|
|
scan_row = QHBoxLayout()
|
|
self.scan_button = QPushButton("Scan Network")
|
|
self.scan_button.clicked.connect(self._scan_network)
|
|
scan_row.addWidget(self.scan_button)
|
|
self.scan_status_label = QLabel("Not scanned yet.")
|
|
self.scan_status_label.setWordWrap(True)
|
|
scan_row.addWidget(self.scan_status_label, 1)
|
|
device_layout.addLayout(scan_row)
|
|
|
|
self.mapping_summary_label = QLabel("No devices discovered.")
|
|
self.mapping_summary_label.setWordWrap(True)
|
|
self.mapping_summary_label.setStyleSheet("color: #C7D2E1;")
|
|
device_layout.addWidget(self.mapping_summary_label)
|
|
|
|
self.discovered_devices_table = QTableWidget(0, len(self.DEVICE_COLUMNS))
|
|
self.discovered_devices_table.setHorizontalHeaderLabels(self.DEVICE_COLUMNS)
|
|
self.discovered_devices_table.verticalHeader().setVisible(False)
|
|
self.discovered_devices_table.setAlternatingRowColors(True)
|
|
self.discovered_devices_table.setSelectionBehavior(QTableWidget.SelectRows)
|
|
self.discovered_devices_table.setSortingEnabled(False)
|
|
self.discovered_devices_table.itemSelectionChanged.connect(self._on_discovered_device_selection_changed)
|
|
header = self.discovered_devices_table.horizontalHeader()
|
|
header.setSectionResizeMode(0, QHeaderView.ResizeToContents)
|
|
header.setSectionResizeMode(1, QHeaderView.ResizeToContents)
|
|
header.setSectionResizeMode(2, QHeaderView.ResizeToContents)
|
|
header.setSectionResizeMode(3, QHeaderView.ResizeToContents)
|
|
header.setSectionResizeMode(4, QHeaderView.Stretch)
|
|
header.setSectionResizeMode(5, QHeaderView.ResizeToContents)
|
|
header.setSectionResizeMode(6, QHeaderView.ResizeToContents)
|
|
device_layout.addWidget(self.discovered_devices_table, 1)
|
|
|
|
self.assignment_feedback_label = QLabel("Select a device or press Identify, then click the matching tile.")
|
|
self.assignment_feedback_label.setWordWrap(True)
|
|
self.assignment_feedback_label.setStyleSheet("color: #A8B3C7;")
|
|
device_layout.addWidget(self.assignment_feedback_label)
|
|
|
|
content_row.addWidget(device_group, 1)
|
|
|
|
def _build_segments_tab(self) -> None:
|
|
layout = QVBoxLayout(self.segments_tab)
|
|
|
|
top_row = QHBoxLayout()
|
|
top_row.addWidget(QLabel("Tile"))
|
|
self.segment_tile_combo = QComboBox()
|
|
self.segment_tile_combo.currentIndexChanged.connect(self._on_segment_tile_changed)
|
|
top_row.addWidget(self.segment_tile_combo, 1)
|
|
self.add_segment_button = QPushButton("Add Segment")
|
|
self.remove_segment_button = QPushButton("Remove Selected")
|
|
top_row.addWidget(self.add_segment_button)
|
|
top_row.addWidget(self.remove_segment_button)
|
|
layout.addLayout(top_row)
|
|
|
|
self.segments_table = QTableWidget(0, len(self.SEGMENT_COLUMNS))
|
|
self.segments_table.setHorizontalHeaderLabels([label for _, label in self.SEGMENT_COLUMNS])
|
|
self.segments_table.verticalHeader().setVisible(False)
|
|
self.segments_table.setAlternatingRowColors(True)
|
|
self.segments_table.setSelectionBehavior(QTableWidget.SelectRows)
|
|
layout.addWidget(self.segments_table, 1)
|
|
|
|
self.add_segment_button.clicked.connect(self._add_segment_row)
|
|
self.remove_segment_button.clicked.connect(self._remove_selected_segments)
|
|
|
|
def _build_raw_tab(self) -> None:
|
|
layout = QVBoxLayout(self.raw_tab)
|
|
|
|
button_row = QHBoxLayout()
|
|
self.refresh_raw_button = QPushButton("Refresh From Tables")
|
|
self.validate_raw_button = QPushButton("Validate XML")
|
|
self.apply_raw_button = QPushButton("Apply XML To Tables")
|
|
button_row.addWidget(self.refresh_raw_button)
|
|
button_row.addWidget(self.validate_raw_button)
|
|
button_row.addWidget(self.apply_raw_button)
|
|
button_row.addStretch(1)
|
|
layout.addLayout(button_row)
|
|
|
|
self.raw_editor = QPlainTextEdit()
|
|
self.raw_editor.setLineWrapMode(QPlainTextEdit.NoWrap)
|
|
self.raw_editor.document().setModified(False)
|
|
layout.addWidget(self.raw_editor, 1)
|
|
|
|
layout.addWidget(QLabel("Validation"))
|
|
self.error_list = QListWidget()
|
|
layout.addWidget(self.error_list)
|
|
|
|
self.refresh_raw_button.clicked.connect(self._refresh_raw_xml)
|
|
self.validate_raw_button.clicked.connect(self._validate_raw_xml)
|
|
self.apply_raw_button.clicked.connect(self._apply_raw_xml)
|
|
|
|
def _populate_tiles_table(self) -> None:
|
|
self.tiles_table.setRowCount(len(self.working_config.tiles))
|
|
for row, tile in enumerate(self.working_config.sorted_tiles()):
|
|
values = {
|
|
"tile_id": tile.tile_id,
|
|
"row": str(tile.row),
|
|
"col": str(tile.col),
|
|
"screen_name": tile.screen_name,
|
|
"controller_ip": tile.controller_ip,
|
|
"controller_name": tile.controller_name,
|
|
"controller_host": tile.controller_host,
|
|
"controller_mac": tile.controller_mac,
|
|
"subnet": str(tile.subnet),
|
|
"universe": str(tile.universe),
|
|
"led_total": str(tile.led_total),
|
|
"brightness_factor": f"{tile.brightness_factor:.3f}",
|
|
"enabled": tile.enabled,
|
|
}
|
|
for column, (key, _) in enumerate(self.TILE_COLUMNS):
|
|
if key == "enabled":
|
|
item = QTableWidgetItem()
|
|
item.setFlags(item.flags() | Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsSelectable)
|
|
item.setCheckState(Qt.Checked if tile.enabled else Qt.Unchecked)
|
|
else:
|
|
item = QTableWidgetItem(str(values[key]))
|
|
self.tiles_table.setItem(row, column, item)
|
|
self.tiles_table.resizeColumnsToContents()
|
|
|
|
def _rebuild_segment_tile_combo(self) -> None:
|
|
self.segment_tile_combo.blockSignals(True)
|
|
self.segment_tile_combo.clear()
|
|
for tile in self.working_config.sorted_tiles():
|
|
self.segment_tile_combo.addItem(tile.tile_id, tile.tile_id)
|
|
current_tile_id = self._active_segment_tile_id or (self.working_config.sorted_tiles()[0].tile_id if self.working_config.tiles else None)
|
|
if current_tile_id:
|
|
index = self.segment_tile_combo.findData(current_tile_id)
|
|
self.segment_tile_combo.setCurrentIndex(max(0, index))
|
|
self._active_segment_tile_id = self.segment_tile_combo.currentData()
|
|
self.segment_tile_combo.blockSignals(False)
|
|
self._populate_segments_table()
|
|
|
|
def _populate_segments_table(self) -> None:
|
|
tile = self._active_tile_for_segments()
|
|
self.segments_table.setRowCount(0)
|
|
if tile is None:
|
|
return
|
|
self.segments_table.setRowCount(len(tile.segments))
|
|
for row, segment in enumerate(tile.segments):
|
|
values = {
|
|
"name": segment.name,
|
|
"side": segment.side,
|
|
"start_channel": str(segment.start_channel),
|
|
"led_count": str(segment.led_count),
|
|
"orientation_rad": f"{segment.orientation_rad:.5f}",
|
|
"x0": f"{segment.x0:.3f}",
|
|
"y0": f"{segment.y0:.3f}",
|
|
"x1": f"{segment.x1:.3f}",
|
|
"y1": f"{segment.y1:.3f}",
|
|
"reverse": segment.reverse,
|
|
}
|
|
for column, (key, _) in enumerate(self.SEGMENT_COLUMNS):
|
|
if key == "reverse":
|
|
item = QTableWidgetItem()
|
|
item.setFlags(item.flags() | Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsSelectable)
|
|
item.setCheckState(Qt.Checked if segment.reverse else Qt.Unchecked)
|
|
else:
|
|
item = QTableWidgetItem(str(values[key]))
|
|
self.segments_table.setItem(row, column, item)
|
|
self.segments_table.resizeColumnsToContents()
|
|
|
|
def _active_tile_for_segments(self):
|
|
return self.working_config.tile_lookup().get(self.segment_tile_combo.currentData())
|
|
|
|
def _on_segment_tile_changed(self) -> None:
|
|
previous_tile_id = self._active_segment_tile_id
|
|
self._sync_segments_table(previous_tile_id)
|
|
self._active_segment_tile_id = self.segment_tile_combo.currentData()
|
|
self._populate_segments_table()
|
|
|
|
def _add_segment_row(self) -> None:
|
|
row = self.segments_table.rowCount()
|
|
self.segments_table.insertRow(row)
|
|
defaults = ["New Segment", "left", "1", "1", "0.0", "0.0", "0.0", "0.0", "0.0", False]
|
|
for column, (_, _) in enumerate(self.SEGMENT_COLUMNS):
|
|
value = defaults[column]
|
|
if column == len(self.SEGMENT_COLUMNS) - 1:
|
|
item = QTableWidgetItem()
|
|
item.setFlags(item.flags() | Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsSelectable)
|
|
item.setCheckState(Qt.Unchecked)
|
|
else:
|
|
item = QTableWidgetItem(str(value))
|
|
self.segments_table.setItem(row, column, item)
|
|
|
|
def _remove_selected_segments(self) -> None:
|
|
rows = sorted({item.row() for item in self.segments_table.selectedItems()}, reverse=True)
|
|
for row in rows:
|
|
self.segments_table.removeRow(row)
|
|
|
|
def _refresh_raw_xml(self) -> None:
|
|
self._sync_tables_to_config()
|
|
self._set_raw_xml_snapshot()
|
|
self.error_list.clear()
|
|
|
|
def _set_raw_xml_snapshot(self) -> None:
|
|
self.raw_editor.setPlainText(config_to_xml_string(self.working_config))
|
|
self.raw_editor.document().setModified(False)
|
|
|
|
def _validate_raw_xml(self) -> None:
|
|
self.error_list.clear()
|
|
try:
|
|
config = load_config_from_string(self.raw_editor.toPlainText(), validate=True)
|
|
self.error_list.addItem(f"XML valid. {len(config.tiles)} tiles loaded.")
|
|
except MappingValidationError as exc:
|
|
for error in exc.errors:
|
|
self.error_list.addItem(error)
|
|
|
|
def _apply_raw_xml(self) -> None:
|
|
self.error_list.clear()
|
|
try:
|
|
self.working_config = load_config_from_string(self.raw_editor.toPlainText(), validate=True)
|
|
self.name_edit.setText(self.working_config.name)
|
|
self.rows_spin.setValue(self.working_config.logical_display.rows)
|
|
self.cols_spin.setValue(self.working_config.logical_display.cols)
|
|
self._populate_tiles_table()
|
|
self._rebuild_segment_tile_combo()
|
|
self._refresh_network_mapping_ui()
|
|
self.raw_editor.document().setModified(False)
|
|
self.error_list.addItem("Applied XML to editable tables.")
|
|
except MappingValidationError as exc:
|
|
for error in exc.errors:
|
|
self.error_list.addItem(error)
|
|
|
|
def _handle_tab_changed(self, index: int) -> None:
|
|
try:
|
|
if self.tabs.widget(index) is self.network_tab:
|
|
self._sync_general_fields()
|
|
self._sync_tiles_table()
|
|
self._sync_segments_table()
|
|
self._refresh_network_mapping_ui()
|
|
elif self.tabs.widget(index) is self.segments_tab:
|
|
self._sync_tiles_table()
|
|
self._rebuild_segment_tile_combo()
|
|
elif self.tabs.widget(index) is self.raw_tab and not self.raw_editor.document().isModified():
|
|
self._refresh_raw_xml()
|
|
except (MappingValidationError, ValueError, KeyError, StopIteration) as exc:
|
|
self.tabs.blockSignals(True)
|
|
self.tabs.setCurrentWidget(self.tiles_tab)
|
|
self.tabs.blockSignals(False)
|
|
QMessageBox.warning(
|
|
self,
|
|
"Mapping Error",
|
|
f"Please resolve the current tile or segment values before switching tabs.\n\n{exc}",
|
|
)
|
|
|
|
def _sync_tables_to_config(self) -> None:
|
|
self._sync_general_fields()
|
|
self._sync_tiles_table()
|
|
self._sync_segments_table()
|
|
|
|
def _sync_general_fields(self) -> None:
|
|
self.working_config.name = self.name_edit.text().strip() or self.working_config.name
|
|
self.working_config.logical_display.rows = self.rows_spin.value()
|
|
self.working_config.logical_display.cols = self.cols_spin.value()
|
|
|
|
def _sync_tiles_table(self) -> None:
|
|
tiles = self.working_config.sorted_tiles()
|
|
for row, tile in enumerate(tiles):
|
|
tile.tile_id = self._text(row, "tile_id")
|
|
tile.row = int(self._text(row, "row"))
|
|
tile.col = int(self._text(row, "col"))
|
|
tile.screen_name = self._text(row, "screen_name")
|
|
tile.controller_ip = self._text(row, "controller_ip")
|
|
tile.controller_name = self._text(row, "controller_name")
|
|
tile.controller_host = self._text(row, "controller_host")
|
|
tile.controller_mac = self._text(row, "controller_mac")
|
|
tile.subnet = int(self._text(row, "subnet"))
|
|
tile.universe = int(self._text(row, "universe"))
|
|
tile.led_total = int(self._text(row, "led_total"))
|
|
tile.brightness_factor = float(self._text(row, "brightness_factor"))
|
|
tile.enabled = self._check(row, "enabled")
|
|
self.working_config.tiles = tiles
|
|
|
|
def _sync_segments_table(self, tile_id: str | None = None) -> None:
|
|
lookup_id = tile_id if tile_id is not None else self._active_segment_tile_id or self.segment_tile_combo.currentData()
|
|
tile = self.working_config.tile_lookup().get(lookup_id)
|
|
if tile is None:
|
|
return
|
|
segments: list[SegmentConfig] = []
|
|
for row in range(self.segments_table.rowCount()):
|
|
segments.append(
|
|
SegmentConfig(
|
|
name=self._segment_text(row, "name"),
|
|
side=self._segment_text(row, "side"),
|
|
start_channel=int(self._segment_text(row, "start_channel")),
|
|
led_count=int(self._segment_text(row, "led_count")),
|
|
orientation_rad=float(self._segment_text(row, "orientation_rad")),
|
|
x0=float(self._segment_text(row, "x0")),
|
|
y0=float(self._segment_text(row, "y0")),
|
|
x1=float(self._segment_text(row, "x1")),
|
|
y1=float(self._segment_text(row, "y1")),
|
|
reverse=self._segment_check(row, "reverse"),
|
|
)
|
|
)
|
|
tile.segments = segments
|
|
|
|
def _scan_network(self) -> None:
|
|
self._sync_general_fields()
|
|
self._sync_tiles_table()
|
|
self._sync_segments_table()
|
|
|
|
self.discovered_devices = []
|
|
self._active_device_key = None
|
|
self._selected_assignment_tile_id = None
|
|
self._scan_worker = NetworkScanWorker(self.working_config)
|
|
self._scan_worker.progress.connect(self._on_scan_progress)
|
|
self._scan_worker.finished.connect(self._on_scan_finished)
|
|
|
|
self.scan_button.setEnabled(False)
|
|
self.scan_status_label.setText("Scanning local subnets for WLED devices...")
|
|
self.assignment_feedback_label.setText("Scanning the network. Results will appear below as devices respond.")
|
|
self._refresh_network_mapping_ui()
|
|
self._scan_worker.start()
|
|
|
|
def _on_scan_progress(self, completed: int, total: int, device: DiscoveredWledDevice | None) -> None:
|
|
self.scan_status_label.setText(f"Scanning {completed}/{total} hosts...")
|
|
if device is None:
|
|
return
|
|
if any(self._device_key(existing) == self._device_key(device) for existing in self.discovered_devices):
|
|
return
|
|
self.discovered_devices.append(device)
|
|
self.discovered_devices.sort(key=lambda item: tuple(int(part) for part in item.ip_address.split(".")))
|
|
if self._active_device() is None:
|
|
self._active_device_key = self._device_key(self.discovered_devices[0])
|
|
self._refresh_network_mapping_ui()
|
|
|
|
def _on_scan_finished(self, devices: list[DiscoveredWledDevice], error_message: str, host_count: int) -> None:
|
|
self.scan_button.setEnabled(True)
|
|
self._scan_worker = None
|
|
self._last_scan_host_count = host_count
|
|
|
|
if error_message:
|
|
self.scan_status_label.setText("Network scan failed.")
|
|
self.assignment_feedback_label.setText(error_message)
|
|
QMessageBox.warning(self, "Network Scan Error", error_message)
|
|
return
|
|
|
|
self.discovered_devices = list(devices)
|
|
if self._active_device() is None and self.discovered_devices:
|
|
self._active_device_key = self._device_key(self.discovered_devices[0])
|
|
if not self.discovered_devices:
|
|
self.scan_status_label.setText(f"No WLED devices found after scanning {host_count} hosts.")
|
|
self.assignment_feedback_label.setText("No WLED devices were detected. Check the subnet, power, and network connectivity.")
|
|
else:
|
|
self.scan_status_label.setText(f"Found {len(self.discovered_devices)} WLED device(s) across {host_count} hosts.")
|
|
self._refresh_network_mapping_ui()
|
|
|
|
def _on_discovered_device_selection_changed(self) -> None:
|
|
if self._device_table_refreshing:
|
|
return
|
|
row = self.discovered_devices_table.currentRow()
|
|
if row < 0:
|
|
return
|
|
item = self.discovered_devices_table.item(row, 0)
|
|
if item is None:
|
|
return
|
|
device_key = item.data(Qt.UserRole)
|
|
if isinstance(device_key, str):
|
|
self._select_device_for_assignment(device_key, select_table=False)
|
|
|
|
def _select_device_for_assignment(self, device_key: str, *, select_table: bool = True) -> None:
|
|
self._active_device_key = device_key
|
|
active_device = self._active_device()
|
|
mapped_tile = find_tile_for_device(self.working_config, active_device) if active_device is not None else None
|
|
self._selected_assignment_tile_id = mapped_tile.tile_id if mapped_tile is not None else self._selected_assignment_tile_id
|
|
if select_table:
|
|
self._select_device_row(device_key)
|
|
self._refresh_network_mapping_ui()
|
|
|
|
def _select_device_row(self, device_key: str) -> None:
|
|
for row in range(self.discovered_devices_table.rowCount()):
|
|
item = self.discovered_devices_table.item(row, 0)
|
|
if item is None:
|
|
continue
|
|
if item.data(Qt.UserRole) == device_key:
|
|
self.discovered_devices_table.selectRow(row)
|
|
return
|
|
|
|
def _identify_device(self, device_key: str) -> None:
|
|
device = self._device_for_key(device_key)
|
|
if device is None:
|
|
return
|
|
|
|
self._select_device_for_assignment(device_key)
|
|
worker = DeviceIdentifyWorker(device)
|
|
self._identify_workers.add(worker)
|
|
worker.finished.connect(self._on_identify_finished)
|
|
self.assignment_feedback_label.setText(
|
|
f"Identifying {device.ip_address}. Watch for the red pulse, then click the matching tile."
|
|
)
|
|
worker.start()
|
|
|
|
def _on_identify_finished(self, device: DiscoveredWledDevice, error_message: str) -> None:
|
|
for worker in list(self._identify_workers):
|
|
if worker._device == device:
|
|
self._identify_workers.discard(worker)
|
|
break
|
|
if error_message:
|
|
self.assignment_feedback_label.setText(error_message)
|
|
QMessageBox.warning(self, "Identify Failed", error_message)
|
|
else:
|
|
mapped_tile = find_tile_for_device(self.working_config, device)
|
|
if mapped_tile is not None:
|
|
self._selected_assignment_tile_id = mapped_tile.tile_id
|
|
self.assignment_feedback_label.setText(
|
|
f"{device.ip_address} pulsed successfully. Click the matching tile to store the assignment."
|
|
)
|
|
self._refresh_network_mapping_ui()
|
|
|
|
def _handle_mapping_preview_click(self, tile_id: str) -> None:
|
|
self._selected_assignment_tile_id = tile_id
|
|
active_device = self._active_device()
|
|
if active_device is None:
|
|
self.assignment_feedback_label.setText(f"{tile_id} selected. Choose a device row or press Identify first.")
|
|
self._refresh_network_mapping_ui()
|
|
return
|
|
self._assign_active_device_to_tile(active_device, tile_id)
|
|
|
|
def _assign_active_device_to_tile(self, device: DiscoveredWledDevice, tile_id: str) -> None:
|
|
tile_lookup = self.working_config.tile_lookup()
|
|
target_tile = tile_lookup.get(tile_id)
|
|
if target_tile is None:
|
|
self.assignment_feedback_label.setText(f"Tile {tile_id} no longer exists in the current mapping.")
|
|
return
|
|
|
|
previous_tile = find_tile_for_device(self.working_config, device)
|
|
displaced_tile_text = ""
|
|
if target_tile.controller_ip or target_tile.controller_mac:
|
|
if not tile_matches_device(target_tile, device):
|
|
displaced_tile_text = target_tile.controller_ip or target_tile.controller_name or target_tile.controller_mac
|
|
|
|
assign_device_to_tile(self.working_config, device, tile_id)
|
|
self._selected_assignment_tile_id = tile_id
|
|
self._populate_tiles_table()
|
|
if not self.raw_editor.document().isModified():
|
|
self._set_raw_xml_snapshot()
|
|
|
|
persisted, persistence_message = self._persist_working_mapping()
|
|
|
|
summary_parts = [f"{device.ip_address} -> {tile_id}"]
|
|
if previous_tile is not None and previous_tile.tile_id != tile_id:
|
|
summary_parts.append(f"moved from {previous_tile.tile_id}")
|
|
if displaced_tile_text:
|
|
summary_parts.append(f"replaced {displaced_tile_text}")
|
|
summary = ", ".join(summary_parts)
|
|
|
|
if persisted:
|
|
self.assignment_feedback_label.setText(f"Saved {summary}. {persistence_message}")
|
|
else:
|
|
self.assignment_feedback_label.setText(f"Stored {summary} in the dialog. {persistence_message}")
|
|
QMessageBox.warning(self, "Assignment Not Saved Yet", persistence_message)
|
|
|
|
self._refresh_network_mapping_ui()
|
|
self._select_next_unmapped_device()
|
|
|
|
def _persist_working_mapping(self) -> tuple[bool, str]:
|
|
if self.raw_editor.document().isModified():
|
|
return (
|
|
False,
|
|
"Raw XML has unapplied changes. Apply or discard those edits before assisted assignments can be written to disk.",
|
|
)
|
|
|
|
target_path = None
|
|
if self.controller is not None and self.controller.mapping_path is not None:
|
|
target_path = self.controller.mapping_path
|
|
elif self.working_config.file_path is not None:
|
|
target_path = self.working_config.file_path
|
|
|
|
if target_path is None:
|
|
return False, "No mapping file is currently open. Use Save or Save As after closing Mapping Settings."
|
|
|
|
validation = validate_config(self.working_config)
|
|
if not validation.is_valid:
|
|
return False, "The mapping is currently invalid and could not be saved: " + "; ".join(validation.errors)
|
|
|
|
try:
|
|
save_config(self.working_config, target_path)
|
|
except (MappingValidationError, OSError, ValueError) as exc:
|
|
if isinstance(exc, MappingValidationError):
|
|
return False, "The mapping could not be saved: " + "; ".join(exc.errors)
|
|
return False, f"The mapping file could not be written: {exc}"
|
|
|
|
if self.controller is not None:
|
|
self.controller.replace_config(self.working_config.clone(), path=target_path)
|
|
return True, f"Wrote {target_path.name}."
|
|
|
|
def _select_next_unmapped_device(self) -> None:
|
|
for device in self.discovered_devices:
|
|
if find_tile_for_device(self.working_config, device) is None:
|
|
self._select_device_for_assignment(self._device_key(device))
|
|
return
|
|
self._refresh_network_mapping_ui()
|
|
|
|
def _refresh_network_mapping_ui(self) -> None:
|
|
active_device = self._active_device()
|
|
mapped_count = sum(1 for device in self.discovered_devices if find_tile_for_device(self.working_config, device) is not None)
|
|
stale_tile_count = sum(
|
|
1
|
|
for tile in self.working_config.sorted_tiles()
|
|
if (tile.controller_ip.strip() or tile.controller_mac.strip())
|
|
and not any(tile_matches_device(tile, device) for device in self.discovered_devices)
|
|
)
|
|
unmapped_count = max(0, len(self.discovered_devices) - mapped_count)
|
|
|
|
if not self.discovered_devices:
|
|
summary = "No devices discovered yet."
|
|
else:
|
|
summary = f"{len(self.discovered_devices)} discovered | {mapped_count} mapped | {unmapped_count} unmapped"
|
|
if stale_tile_count:
|
|
summary += f" | {stale_tile_count} saved assignment(s) not seen in this scan"
|
|
self.mapping_summary_label.setText(summary)
|
|
|
|
if active_device is None:
|
|
self.assignment_workflow_label.setText("Select a device row or press Identify, then click the matching tile.")
|
|
else:
|
|
mapped_tile = find_tile_for_device(self.working_config, active_device)
|
|
if mapped_tile is None:
|
|
self.assignment_workflow_label.setText(
|
|
f"Active device: {active_device.ip_address}. Click the physical tile that just flashed."
|
|
)
|
|
else:
|
|
self.assignment_workflow_label.setText(
|
|
f"Active device: {active_device.ip_address} is currently mapped to {mapped_tile.tile_id}. "
|
|
"Click a different tile to reassign it."
|
|
)
|
|
|
|
self.mapping_preview.set_assignment_state(
|
|
self.working_config,
|
|
self.discovered_devices,
|
|
active_device=active_device,
|
|
selected_tile_id=self._selected_assignment_tile_id,
|
|
)
|
|
self._refresh_discovered_devices_table()
|
|
|
|
def _refresh_discovered_devices_table(self) -> None:
|
|
self._device_table_refreshing = True
|
|
try:
|
|
self.discovered_devices_table.setRowCount(len(self.discovered_devices))
|
|
for row, device in enumerate(self.discovered_devices):
|
|
mapped_tile = find_tile_for_device(self.working_config, device)
|
|
status = "Unmapped"
|
|
if mapped_tile is not None:
|
|
if mapped_tile.controller_ip.strip() and mapped_tile.controller_ip.strip() != device.ip_address:
|
|
status = f"Mapped to {mapped_tile.tile_id} (saved IP {mapped_tile.controller_ip})"
|
|
else:
|
|
status = f"Mapped to {mapped_tile.tile_id}"
|
|
|
|
values = [
|
|
device.ip_address,
|
|
device.hostname,
|
|
device.instance_name,
|
|
device.mac_address,
|
|
status,
|
|
]
|
|
device_key = self._device_key(device)
|
|
for column, value in enumerate(values):
|
|
item = QTableWidgetItem(value)
|
|
item.setData(Qt.UserRole, device_key)
|
|
if device_key == self._active_device_key:
|
|
item.setBackground(Qt.darkBlue)
|
|
item.setForeground(Qt.white)
|
|
elif mapped_tile is not None:
|
|
item.setBackground(Qt.darkGreen)
|
|
self.discovered_devices_table.setItem(row, column, item)
|
|
|
|
map_button = QPushButton("Map This")
|
|
map_button.clicked.connect(lambda _checked=False, key=device_key: self._select_device_for_assignment(key))
|
|
self.discovered_devices_table.setCellWidget(row, 5, map_button)
|
|
|
|
identify_button = QPushButton("Identify")
|
|
identify_button.clicked.connect(lambda _checked=False, key=device_key: self._identify_device(key))
|
|
self.discovered_devices_table.setCellWidget(row, 6, identify_button)
|
|
|
|
self.discovered_devices_table.resizeRowsToContents()
|
|
if self._active_device_key is not None:
|
|
self._select_device_row(self._active_device_key)
|
|
finally:
|
|
self._device_table_refreshing = False
|
|
|
|
def _device_for_key(self, device_key: str | None) -> DiscoveredWledDevice | None:
|
|
if not device_key:
|
|
return None
|
|
for device in self.discovered_devices:
|
|
if self._device_key(device) == device_key:
|
|
return device
|
|
return None
|
|
|
|
def _active_device(self) -> DiscoveredWledDevice | None:
|
|
return self._device_for_key(self._active_device_key)
|
|
|
|
def _device_key(self, device: DiscoveredWledDevice) -> str:
|
|
return device.mac_address or device.ip_address
|
|
|
|
def _text(self, row: int, key: str) -> str:
|
|
column = next(index for index, (field, _) in enumerate(self.TILE_COLUMNS) if field == key)
|
|
item = self.tiles_table.item(row, column)
|
|
return item.text().strip() if item else ""
|
|
|
|
def _segment_text(self, row: int, key: str) -> str:
|
|
column = next(index for index, (field, _) in enumerate(self.SEGMENT_COLUMNS) if field == key)
|
|
item = self.segments_table.item(row, column)
|
|
return item.text().strip() if item else ""
|
|
|
|
def _check(self, row: int, key: str) -> bool:
|
|
column = next(index for index, (field, _) in enumerate(self.TILE_COLUMNS) if field == key)
|
|
item = self.tiles_table.item(row, column)
|
|
return bool(item and item.checkState() == Qt.Checked)
|
|
|
|
def _segment_check(self, row: int, key: str) -> bool:
|
|
column = next(index for index, (field, _) in enumerate(self.SEGMENT_COLUMNS) if field == key)
|
|
item = self.segments_table.item(row, column)
|
|
return bool(item and item.checkState() == Qt.Checked)
|
|
|
|
def _save_and_accept(self) -> None:
|
|
try:
|
|
if self.raw_editor.document().isModified():
|
|
self.working_config = load_config_from_string(self.raw_editor.toPlainText(), validate=True)
|
|
else:
|
|
self._sync_tables_to_config()
|
|
result = validate_config(self.working_config)
|
|
if not result.is_valid:
|
|
raise MappingValidationError(result.errors)
|
|
except (MappingValidationError, ValueError) as exc:
|
|
errors = exc.errors if isinstance(exc, MappingValidationError) else [str(exc)]
|
|
self.error_list.clear()
|
|
for error in errors:
|
|
self.error_list.addItem(error)
|
|
self.tabs.setCurrentWidget(self.raw_tab)
|
|
QMessageBox.warning(self, "Validation Error", "Please resolve the validation errors before saving.")
|
|
return
|
|
|
|
self.result_config = self.working_config
|
|
self.accept()
|