From ded39ec1587c43c07098cfa1c02a71ef03c928ca Mon Sep 17 00:00:00 2001 From: Jan Date: Thu, 7 Aug 2025 03:10:53 +0200 Subject: [PATCH] MainCode/adalm1000_logger.py aktualisiert Signal disconnect warning: disconnect() failed between 'update_signal' and all its connections still problems when switching D --- MainCode/adalm1000_logger.py | 775 ++++++++++++++++++++--------------- 1 file changed, 454 insertions(+), 321 deletions(-) diff --git a/MainCode/adalm1000_logger.py b/MainCode/adalm1000_logger.py index e42d501..c39166f 100644 --- a/MainCode/adalm1000_logger.py +++ b/MainCode/adalm1000_logger.py @@ -20,6 +20,7 @@ from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread from PyQt5.QtGui import QDoubleValidator from PyQt5 import sip import pysmu +from pysmu import Session class DeviceManager: def __init__(self, dev): @@ -27,6 +28,9 @@ class DeviceManager: self.serial = dev.serial self.measurement_thread = None self.is_running = False + self.is_recording = False + self.log_file = None + self.log_writer = None # Datenpuffer max_data_points = 36000 @@ -58,23 +62,53 @@ class DeviceManager: self.consecutive_read_errors = 0 # Track read failures self.max_consecutive_errors = 5 # Threshold before reset + + self.status_colors = { + "connected": "green", + "disconnected": "red", + "error": "orange", + } def handle_read_error(self, increment=1): - """Handle read errors with automatic reset""" + """Enhanced device recovery with proper session handling""" self.consecutive_read_errors += increment if self.consecutive_read_errors >= self.max_consecutive_errors: try: - print("Resetting device due to persistent errors") - self.dev.session.end() - time.sleep(0.5) - self.dev.session = PatchedSession() - self.dev.session.start(0) - return True # Recovery attempted + print("Attempting device recovery...") + + # 1. First try soft reset + try: + if hasattr(self.dev, 'reset'): + self.dev.reset() + time.sleep(0.5) + return True + except Exception as e: + print(f"Soft reset failed: {e}") + + # 2. Full reinitialization + global_session = pysmu.Session() + devices = global_session.devices + + if not devices: + print("No devices found after rescan") + return False + + # Find our device by serial + for new_dev in devices: + if new_dev.serial == self.serial: + self.dev = new_dev + self.consecutive_read_errors = 0 + print("Device reinitialized successfully") + return True + + print("Original device not found after rescan") + return False + except Exception as e: - print(f"Device reset failed: {e}") - return False # Unrecoverable error - return True # Under threshold - continue + print(f"Device recovery failed: {e}") + return False + return True def start_measurement(self, interval=0.1): self.stop_measurement() # Ensure any existing thread is stopped @@ -141,15 +175,16 @@ class MeasurementThread(QThread): def stop(self): self._running = False - if not self.wait(500): # Wait up to 500ms for clean shutdown - self.terminate() + try: + self.device.channels['A'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].constant(0) + except Exception as e: + print(f"Error stopping device: {e}") def run(self): - """Continuous measurement loop with enhanced error handling""" + """Measurement loop with enhanced recovery""" self._running = True - self.start_time = time.time() # This gets reset when mode changes - consecutive_errors = 0 - max_consecutive_errors = 5 + self.start_time = time.time() while self._running: try: @@ -208,25 +243,15 @@ class MeasurementThread(QThread): except DeviceDisconnectedError as e: self.error_signal.emit(f"Device disconnected: {str(e)}") - break - except ValueError as e: - # Skip invalid measurements but log first occurrence - if consecutive_errors == 0: - self.error_signal.emit(f"Measurement error: {str(e)}") - consecutive_errors += 1 - time.sleep(0.1) + if not self.parent_manager.handle_read_error(): + break # Stop if recovery failed + time.sleep(1) # Wait before retrying + except Exception as e: self.error_signal.emit(f"Read error: {str(e)}") - consecutive_errors += 1 + if not self.parent_manager.handle_read_error(): + break time.sleep(1) - - # Handle persistent errors - if consecutive_errors >= max_consecutive_errors: - if hasattr(self, 'parent_manager'): - if not self.parent_manager.handle_read_error(): - self.error_signal.emit("Critical error - stopping measurement") - break - consecutive_errors = 0 def set_direction(self, direction): """Set current direction (1 for source, -1 for sink)""" @@ -345,12 +370,10 @@ class TestSequenceWorker(QObject): time.sleep(1) def stop(self): - """Request the thread to stop""" self._running = False try: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) - self.device.channels['B'].mode = pysmu.Mode.HI_Z except Exception as e: print(f"Error stopping device: {e}") @@ -461,12 +484,10 @@ class DischargeWorker(QObject): self.device.channels['A'].constant(0) def stop(self): - """Request the thread to stop""" self._running = False try: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) - self.device.channels['B'].mode = pysmu.Mode.HI_Z except Exception as e: print(f"Error stopping device: {e}") @@ -539,29 +560,38 @@ class ChargeWorker(QObject): self.finished.emit() def stop(self): - """Request the thread to stop""" self._running = False try: + self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) except Exception as e: - print(f"Error stopping charge: {e}") + print(f"Error stopping device: {e}") class BatteryTester(QMainWindow): def __init__(self): self.plot_mutex = threading.Lock() super().__init__() - self.devices = {} # Dictionary DeviceManager-Instanzen + self.devices = {} self.active_device = None self.last_logged_phase = None - # Color scheme + # Color scheme - MUST BE DEFINED FIRST self.bg_color = "#2E3440" self.fg_color = "#D8DEE9" self.accent_color = "#5E81AC" self.warning_color = "#BF616A" self.success_color = "#A3BE8C" + # Status colors - MUST BE DEFINED BEFORE init_device() + self.status_colors = { + "connected": "green", + "disconnected": "red", + "error": "orange", + "active": self.accent_color, + "warning": self.warning_color + } + # Device and measurement state self.session_active = False self.measuring = False @@ -589,10 +619,17 @@ class BatteryTester(QMainWindow): self.cycle_count = 0 self.start_time = time.time() self.last_update_time = self.start_time + + self.capacity = 1.0 + self.c_rate = 0.1 + self.charge_cutoff = 1.43 + self.discharge_cutoff = 0.01 + self.rest_time = 0.5 # Initialize UI and device self.setup_ui() self.init_device() + self.current_mode = "Live Monitoring" # Default mode # Set window properties self.setWindowTitle("ADALM1000 - Battery Tester (Multi-Mode)") @@ -829,6 +866,9 @@ class BatteryTester(QMainWindow): QPushButton:checked {{ background-color: {self.warning_color}; }} + QPushButton:pressed {{ + background-color: {self.warning_color}; + }} QPushButton:disabled {{ background-color: #4C566A; color: #D8DEE9; @@ -933,7 +973,7 @@ class BatteryTester(QMainWindow): elif mode_name == "Charge Test": self.toggle_button.setText("START CHARGE") elif mode_name == "Live Monitoring": - self.toggle_button.setText("START") # Will be hidden anyway + self.toggle_button.setText("START") self.toggle_button.hide() # Reset button state @@ -942,7 +982,7 @@ class BatteryTester(QMainWindow): # Reset measurement state and zero the time if self.active_device: dev = self.active_device - dev.reset_data() # This clears all data buffers + dev.reset_data() # Reset the measurement thread's start time if hasattr(dev, 'measurement_thread'): @@ -960,6 +1000,18 @@ class BatteryTester(QMainWindow): self.status_bar.showMessage(f"Mode changed to {mode_name}") + # Update recording button + if mode_name == "Live Monitoring": + self.record_button.setVisible(True) + if self.active_device and self.active_device.is_recording: + self.record_button.setChecked(True) + self.record_button.setText("Stop Recording") + else: + self.record_button.setChecked(False) + self.record_button.setText("Start Recording") + else: + self.record_button.setVisible(False) + def reset_test(self): if not self.active_device: return @@ -975,17 +1027,15 @@ class BatteryTester(QMainWindow): def toggle_recording(self): """Toggle data recording in Live Monitoring mode""" - if self.record_button.isChecked(): - # Start recording + if not self.active_device: + return + + dev = self.active_device + + if not dev.is_recording: # Use device's recording state try: - # Reset previous data - self.reset_test() - - # Reset measurement timing - if hasattr(self, 'measurement_thread'): - self.measurement_thread.start_time = time.time() - if self.create_cycle_log_file(): + dev.is_recording = True # Set device's recording state self.record_button.setText("Stop Recording") self.status_bar.showMessage("Live recording started") # Ensure monitoring is running @@ -1002,8 +1052,9 @@ class BatteryTester(QMainWindow): else: # Stop recording try: - if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None: + if hasattr(self, 'current_cycle_file') and self.current_cycle_file: self.finalize_log_file() + dev.is_recording = False # Clear device's recording state self.record_button.setText("Start Recording") self.status_bar.showMessage("Live recording stopped") except Exception as e: @@ -1060,34 +1111,36 @@ class BatteryTester(QMainWindow): self.main_layout.addWidget(self.canvas, 1) def init_device(self): - """Initialize ADALM1000 devices with proper error handling""" + """Robust device initialization""" try: - # Cleanup previous session + # Close existing session if hasattr(self, 'session'): try: self.session.end() except: pass - - self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) + + # Create new session + self.session = Session(ignore_dataflow=True, queue_size=10000) + self.session.scan() - # Retry mechanism with progress feedback + # Retry mechanism retry_count = 0 while not self.session.devices and retry_count < 3: - self.status_bar.showMessage(f"Scanning for devices... (Attempt {retry_count + 1}/3)") - QApplication.processEvents() # Update UI + self.handle_device_connection(False, f"Scanning... (Attempt {retry_count+1}/3)") + QApplication.processEvents() time.sleep(1) - self.session.scan() # Manual scan + self.session.scan() retry_count += 1 if not self.session.devices: - self.handle_no_devices() + self.handle_device_connection(False, "No devices found") return self.session.start(0) self.devices = {} for dev in self.session.devices: - manager = DeviceManager(dev) + manager = DeviceManager(dev) # Should work now manager.start_measurement(interval=self.interval) self.devices[dev.serial] = manager @@ -1102,8 +1155,7 @@ class BatteryTester(QMainWindow): self.device_combo.setCurrentText(first_serial) self.session_active = True - self.connection_label.setText(f"Connected: {first_serial}") - self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") + self.handle_device_connection(True, f"Connected: {first_serial}") self.toggle_button.setEnabled(True) # Connect measurement signals @@ -1112,7 +1164,7 @@ class BatteryTester(QMainWindow): self.measurement_thread.error_signal.connect(self.handle_device_error) except Exception as e: - self.handle_device_error(f"Initialization failed: {str(e)}") + self.handle_device_connection(False, f"Initialization failed: {str(e)}") def handle_no_devices(self): """Handle case when no devices are found""" @@ -1120,13 +1172,38 @@ class BatteryTester(QMainWindow): self.active_device = None self.status_bar.showMessage("No ADALM1000 devices found") self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") - self.start_button.setEnabled(False) + self.toggle_button.setEnabled(False) self.device_combo.clear() # Show reconnect button self.reconnect_btn.setEnabled(True) self.reconnect_btn.setVisible(True) + def handle_device_connection(self, connected, message=None): + """Update connection status with proper coloring""" + if connected: + status = "connected" + if not message: + message = "Connected" + else: + status = "error" if "fail" in message.lower() else "disconnected" + if not message: + message = "Disconnected" + + color = self.status_colors.get(status, "orange") + self.connection_label.setText(message) + self.status_light.setStyleSheet(f""" + background-color: {color}; + border-radius: 10px; + """) + QApplication.processEvents() + + def check_connection(self): + """Periodically verify device connection""" + if not hasattr(self, 'session') or not self.session.devices: + self.handle_device_error("Device disconnected") + self.reconnect_device() + def request_usb_permissions(self): """Handle USB permission issues with user interaction""" msg = QMessageBox(self) @@ -1247,6 +1324,18 @@ class BatteryTester(QMainWindow): self.update_ui_from_active_device() self.status_bar.showMessage(f"Switched to device: {serial}") + # Update recording button state + if self.current_mode == "Live Monitoring": + self.record_button.setVisible(True) + if self.active_device.is_recording: # Check new device's state + self.record_button.setChecked(True) + self.record_button.setText("Stop Recording") + else: + self.record_button.setChecked(False) + self.record_button.setText("Start Recording") + else: + self.record_button.setVisible(False) + def update_ui_from_active_device(self): dev = self.active_device if not dev: @@ -1781,8 +1870,8 @@ class BatteryTester(QMainWindow): self.test_phase = "Discharge" self.phase_label.setText(self.test_phase) - self.start_button.setEnabled(False) - self.stop_button.setEnabled(True) + self.toggle_button.setChecked(True) + self.toggle_button.setText("STOP") self.status_bar.showMessage(f"Discharge started | Current: {test_current:.4f}A") # Create log file @@ -1887,8 +1976,8 @@ class BatteryTester(QMainWindow): self.test_phase = "Charge" self.phase_label.setText(self.test_phase) - self.start_button.setEnabled(False) - self.stop_button.setEnabled(True) + self.toggle_button.setChecked(True) + self.toggle_button.setText("STOP") self.status_bar.showMessage(f"Charge started @ {test_current:.3f}A to {self.charge_cutoff}V") # Create log file @@ -1953,11 +2042,6 @@ class BatteryTester(QMainWindow): self.test_running = True dev_manager.test_phase = "Live Monitoring" self.phase_label.setText(dev_manager.test_phase) - self.stop_button.setEnabled(True) - self.start_button.setEnabled(False) - - # Hide the toggle button in Live mode - self.toggle_button.hide() # Status self.status_bar.showMessage(f"Live monitoring started | Device: {dev.serial}") @@ -1986,15 +2070,15 @@ class BatteryTester(QMainWindow): # Generiere Timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - device_serial = self.active_device.serial if self.active_device else "unknown" + device_serial = self.active_device.serial[-2:] if self.active_device else "xx" # Generiere Dateinamen mit Seriennummer if self.current_mode == "Cycle Test": - self.filename = os.path.join(self.log_dir, f"battery_cycle_{device_serial}_{timestamp}.csv") + self.filename = os.path.join(self.log_dir, f"battery_cycle_{timestamp}_{device_serial}.csv") elif self.current_mode == "Discharge Test": - self.filename = os.path.join(self.log_dir, f"battery_discharge_{device_serial}_{timestamp}.csv") + self.filename = os.path.join(self.log_dir, f"battery_discharge_{timestamp}_{device_serial}.csv") else: # Live Monitoring - self.filename = os.path.join(self.log_dir, f"battery_live_{device_serial}_{timestamp}.csv") + self.filename = os.path.join(self.log_dir, f"battery_live_{timestamp}_{device_serial}.csv") # Öffne neue Datei try: @@ -2045,34 +2129,34 @@ class BatteryTester(QMainWindow): """Finalize the current log file""" if hasattr(self, 'current_cycle_file') and self.current_cycle_file: try: - test_current = self.c_rate * self.capacity - test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A" + test_current = getattr(self, 'c_rate', 0) * getattr(self, 'capacity', 0) + test_conditions = getattr(self, 'test_conditions_input', lambda: "N/A").text() self.current_cycle_file.write("\n# TEST SUMMARY\n") self.current_cycle_file.write(f"# Test Parameters:\n") - self.current_cycle_file.write(f"# - Battery Capacity: {self.capacity} Ah\n") + self.current_cycle_file.write(f"# - Battery Capacity: {getattr(self, 'capacity', 'N/A')} Ah\n") - if self.current_mode != "Live Monitoring": - self.current_cycle_file.write(f"# - Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n") + if getattr(self, 'current_mode', 'Live Monitoring') != "Live Monitoring": + self.current_cycle_file.write(f"# - Test Current: {test_current:.4f} A (C/{1/getattr(self, 'c_rate', 1):.1f})\n") - if self.current_mode == "Cycle Test": - self.current_cycle_file.write(f"# - Charge Cutoff: {self.charge_cutoff} V\n") - self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n") - self.current_cycle_file.write(f"# - Rest Time: {self.rest_time} hours\n") - elif self.current_mode == "Discharge Test": - self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n") + if getattr(self, 'current_mode', '') == "Cycle Test": + self.current_cycle_file.write(f"# - Charge Cutoff: {getattr(self, 'charge_cutoff', 'N/A')} V\n") + self.current_cycle_file.write(f"# - Discharge Cutoff: {getattr(self, 'discharge_cutoff', 'N/A')} V\n") + self.current_cycle_file.write(f"# - Rest Time: {getattr(self, 'rest_time', 'N/A')} hours\n") + elif getattr(self, 'current_mode', '') == "Discharge Test": + self.current_cycle_file.write(f"# - Discharge Cutoff: {getattr(self, 'discharge_cutoff', 'N/A')} V\n") self.current_cycle_file.write(f"# - Test Conditions: {test_conditions}\n") self.current_cycle_file.write(f"# Results:\n") - if self.current_mode == "Cycle Test": - self.current_cycle_file.write(f"# - Cycles Completed: {self.cycle_count}\n") - self.current_cycle_file.write(f"# - Final Discharge Capacity: {self.capacity_ah:.4f} Ah\n") - self.current_cycle_file.write(f"# - Final Charge Capacity: {self.charge_capacity:.4f} Ah\n") - self.current_cycle_file.write(f"# - Coulombic Efficiency: {self.coulomb_efficiency:.1f}%\n") + if getattr(self, 'current_mode', '') == "Cycle Test": + self.current_cycle_file.write(f"# - Cycles Completed: {getattr(self, 'cycle_count', 0)}\n") + self.current_cycle_file.write(f"# - Final Discharge Capacity: {getattr(self, 'capacity_ah', 0):.4f} Ah\n") + self.current_cycle_file.write(f"# - Final Charge Capacity: {getattr(self, 'charge_capacity', 0):.4f} Ah\n") + self.current_cycle_file.write(f"# - Coulombic Efficiency: {getattr(self, 'coulomb_efficiency', 0):.1f}%\n") else: - self.current_cycle_file.write(f"# - Capacity: {self.capacity_ah:.4f} Ah\n") - self.current_cycle_file.write(f"# - Energy: {self.energy:.4f} Wh\n") + self.current_cycle_file.write(f"# - Capacity: {getattr(self, 'capacity_ah', 0):.4f} Ah\n") + self.current_cycle_file.write(f"# - Energy: {getattr(self, 'energy', 0):.4f} Wh\n") self.current_cycle_file.close() except Exception as e: @@ -2088,42 +2172,147 @@ class BatteryTester(QMainWindow): return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def stop_test(self): - """Request immediate stop of the current test or monitoring""" + """Request immediate stop with proper visual feedback""" + # Immediate red button feedback + self.toggle_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.status_colors["warning"]}; + color: white; + font-weight: bold; + }} + """) + QApplication.processEvents() + if not self.test_running: + self.reset_button_state() return - self.request_stop = True - self.test_running = False - self.measuring = False + try: + # Stop operations + self.request_stop = True + self.test_running = False + self.measuring = False - # Stop test threads - for attr in ['test_sequence_worker', 'discharge_worker', 'charge_worker']: - if hasattr(self, attr): - worker = getattr(self, attr) - try: - if worker and not sip.isdeleted(worker): - worker.stop() - except: - pass + # Stop workers + workers = ['test_sequence_worker', 'discharge_worker', 'charge_worker'] + for worker in workers: + if hasattr(self, worker): + getattr(self, worker).stop() - # Reset UI - if self.active_device: - self.active_device.test_phase = "Idle" - self.phase_label.setText("Idle") + # Reset device + if self.active_device: + self.active_device.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.active_device.dev.channels['A'].constant(0) + self.handle_device_connection(True) # Confirm connection - # Reset button state + except Exception as e: + self.handle_device_connection(False, e) + finally: + # Clean up + self.reset_test_data() + self.reset_plot() + self.reset_button_state() + + if hasattr(self, 'current_cycle_file'): + self.finalize_log_file() + + self.status_bar.showMessage("Test stopped") + + def set_connection_status(self, text, color=None): + """Update connection status with optional color""" + if color is None: + if "error" in text.lower(): + color = self.status_colors["error"] + elif "disconnect" in text.lower(): + color = self.status_colors["disconnected"] + else: + color = self.status_colors["connected"] + + self.connection_label.setText(text) + self.status_light.setStyleSheet(f""" + background-color: {color}; + border-radius: 10px; + min-width: 12px; + max-width: 12px; + min-height: 12px; + max-height: 12px; + """) + QApplication.processEvents() + + def reset_button_state(self): + """Reset button to appropriate default state""" + mode = getattr(self, 'current_mode', 'Live Monitoring') + text = { + 'Cycle Test': "START CYCLE TEST", + 'Discharge Test': "START DISCHARGE", + 'Charge Test': "START CHARGE" + }.get(mode, "START") + self.toggle_button.setChecked(False) - if self.current_mode == "Cycle Test": - self.toggle_button.setText("START CYCLE TEST") - elif self.current_mode == "Discharge Test": - self.toggle_button.setText("START DISCHARGE") - elif self.current_mode == "Charge Test": - self.toggle_button.setText("START CHARGE") + self.toggle_button.setText(text) + self.toggle_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.status_colors["active"]}; + color: {self.fg_color}; + font-weight: bold; + }} + QPushButton:checked {{ + background-color: {self.status_colors["warning"]}; + }} + QPushButton:disabled {{ + background-color: #4C566A; + }} + """) - if self.current_mode == "Live Monitoring": - self.status_bar.showMessage("Live monitoring stopped") - else: - self.status_bar.showMessage("Test stopped by user") + def reset_button_style(self): + """Reset button to default style""" + mode = getattr(self, 'current_mode', 'Live Monitoring') + text = { + 'Cycle Test': "START CYCLE TEST", + 'Discharge Test': "START DISCHARGE", + 'Charge Test': "START CHARGE", + }.get(mode, "START") + + self.toggle_button.setText(text) + self.toggle_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.accent_color}; + color: {self.fg_color}; + font-weight: bold; + }} + QPushButton:checked {{ + background-color: {self.warning_color}; + }} + """) + + def reset_test_data(self): + """Completely reset all test data""" + if not self.active_device: + return + + dev = self.active_device + # Clear all data buffers + dev.time_data.clear() + dev.voltage_data.clear() + dev.current_data.clear() + dev.display_time_data.clear() + dev.display_voltage_data.clear() + dev.display_current_data.clear() + + # Reset statistics + dev.capacity_ah = 0.0 + dev.energy = 0.0 + dev.charge_capacity = 0.0 + dev.coulomb_efficiency = 0.0 + dev.test_phase = "Idle" + + # Reset UI displays + self.voltage_label.setText("0.000") + self.current_label.setText("0.000") + self.time_label.setText("00:00:00") + self.capacity_label.setText("0.0000") + self.energy_label.setText("0.0000") + self.phase_label.setText("Idle") def finalize_test(self): """Final cleanup after test completes or is stopped""" @@ -2308,38 +2497,18 @@ class BatteryTester(QMainWindow): self.status_bar.showMessage("Error during test finalization") def reset_plot(self): - """Completely reset the plot - clears all data and visuals""" - # Clear line data + """Completely reset the plot to initial state""" + # Clear plot data self.line_voltage.set_data([], []) self.line_current.set_data([], []) - # Reset axes with appropriate ranges - voltage_padding = 0.2 - min_voltage = 0 - max_voltage = 5.0 # Max voltage for ADALM1000 + # Reset axes + self.ax.set_xlim(0, 10) + self.ax.set_ylim(0, 5.0) # Full voltage range + self.ax2.set_ylim(-0.25, 0.25) # Full current range - self.ax.set_xlim(0, 10) # Reset X axis - self.ax.set_ylim(min_voltage, max_voltage) - self.ax.set_xlabel('Time (s)', color=self.fg_color) - self.ax.set_ylabel("Voltage (V)", color='#00BFFF') - self.ax.set_title('Battery Test', color=self.fg_color) - self.ax.tick_params(axis='x', colors=self.fg_color) - self.ax.tick_params(axis='y', labelcolor='#00BFFF') - self.ax.grid(True, color='#4C566A') - - # Reset twin axis (current) - current_padding = 0.05 - self.ax2.set_xlim(0, 10) - self.ax2.set_ylim(-0.25 - current_padding, 0.25 + current_padding) - self.ax2.set_ylabel("Current (A)", color='r') - self.ax2.tick_params(axis='y', labelcolor='r') - - # Redraw legends - self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) - self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) - - # Force immediate redraw - self.canvas.draw() + # Redraw with slight delay to ensure UI updates + QTimer.singleShot(50, self.canvas.draw_idle) def update_status_and_plot(self): """Combined status and plot update""" @@ -2414,21 +2583,11 @@ class BatteryTester(QMainWindow): @pyqtSlot(str) def handle_device_error(self, error_msg): - """Enhanced error handling with recovery""" - print(f"DEVICE ERROR: {error_msg}") - - # Special handling for USB errors - if "USB" in error_msg or "LIBUSB" in error_msg.upper(): - error_msg += "\n\nCheck USB connection and permissions" - self.status_light.setStyleSheet("background-color: red;") - else: - self.status_light.setStyleSheet("background-color: orange;") - - self.status_bar.showMessage(f"Device error: {error_msg}") - - # Attempt automatic recovery for non-critical errors - if "No samples" in error_msg or "timed out" in error_msg: - QTimer.singleShot(1000, self.reconnect_device) + """Handle device errors with proper connection status""" + self.handle_device_connection(False, f"Error: {error_msg}") + self.reconnect_btn.setVisible(True) + self.reconnect_btn.setEnabled(True) + self.toggle_button.setEnabled(False) def validate_measurements(self, voltage, current): """Filter out invalid measurements""" @@ -2493,163 +2652,137 @@ class BatteryTester(QMainWindow): QTimer.singleShot(1000, self.reconnect_device) - def reconnect_device(self): - """Robuste Geräte-Neuerkennung mit vollständigem Reset und Statusfeedback""" - self.status_bar.showMessage("Starting device reconnection...") - self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;") - QApplication.processEvents() # Sofortiges UI-Update erzwingen - - # 1. Vorhandene Verbindungen sauber beenden - try: - if hasattr(self, 'measurement_thread'): - self.measurement_thread.stop() - if not self.measurement_thread.wait(1000): # Timeout 1s - self.measurement_thread.terminate() - except Exception as e: - print(f"Error stopping measurement thread: {e}") - - # 2. Alte Session bereinigen - if hasattr(self, 'session'): - try: - self.session.end() - except Exception as e: - print(f"Error ending session: {e}") - finally: - del self.session - - # 3. Neue Session mit Fortschrittsfeedback - retry_count = 0 - max_retries = 3 - reconnect_delay = 2000 # ms - - while retry_count < max_retries: - try: - self.status_bar.showMessage(f"Scanning for devices (Attempt {retry_count + 1}/{max_retries})...") - QApplication.processEvents() - - # Neue Session erstellen - self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) - - # Manueller Scan mit Timeout - scan_success = False - for _ in range(2): # Max 2 Scan-Versuche - self.session.scan() - if self.session.devices: - scan_success = True - break - time.sleep(0.5) - - if not scan_success: - raise DeviceDisconnectedError("No devices detected") - - # 4. Geräteliste aktualisieren - current_devices = {dev.serial: dev for dev in self.session.devices} - old_devices = self.devices.copy() if hasattr(self, 'devices') else {} - - # Neue Geräte hinzufügen - new_devices = {} - for serial, dev in current_devices.items(): - if serial in old_devices: - # Bestehendes Gerät wiederverwenden - new_devices[serial] = old_devices[serial] - else: - # Neues Gerät initialisieren - self.status_bar.showMessage(f"Initializing device {serial}...") - QApplication.processEvents() - manager = DeviceManager(dev) - manager.start_measurement(self.interval) - new_devices[serial] = manager - - # Nicht mehr vorhandene Geräte entfernen - for serial, manager in old_devices.items(): - if serial not in current_devices: - try: - manager.stop_measurement() - except Exception as e: - print(f"Error stopping device {serial}: {e}") - - self.devices = new_devices - - # 5. Aktives Gerät auswählen - current_serial = self.active_device.serial if (hasattr(self, 'active_device') and self.active_device) else None - - # UI aktualisieren - self.device_combo.clear() - for serial in self.devices: - self.device_combo.addItem(serial) - - if current_serial in self.devices: - self.device_combo.setCurrentText(current_serial) - self.active_device = self.devices[current_serial] - elif self.devices: - first_serial = next(iter(self.devices)) - self.device_combo.setCurrentText(first_serial) - self.active_device = self.devices[first_serial] - else: - raise DeviceDisconnectedError("No valid devices available") - - # Signalverbindungen herstellen - if hasattr(self.active_device, 'measurement_thread'): - self.measurement_thread = self.active_device.measurement_thread - self.measurement_thread.update_signal.connect(self.update_measurements) - self.measurement_thread.error_signal.connect(self.handle_device_error) - - # Erfolgsmeldung - self.connection_label.setText(f"Connected: {self.active_device.serial}") - self.status_bar.showMessage("Successfully reconnected devices") - self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") - self.toggle_button.setEnabled(True) - return - - except DeviceDisconnectedError as e: - retry_count += 1 - if retry_count < max_retries: - self.status_bar.showMessage(f"Reconnect failed: {e}. Retrying in {reconnect_delay/1000}s...") - QApplication.processEvents() - time.sleep(reconnect_delay/1000) - else: - self.status_bar.showMessage(f"Reconnect failed after {max_retries} attempts") - self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") - QTimer.singleShot(reconnect_delay, self.attempt_reconnect) - except Exception as e: - print(f"Critical reconnect error: {traceback.format_exc()}") - self.status_bar.showMessage(f"Critical error: {str(e)}") - self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") - QTimer.singleShot(reconnect_delay, self.attempt_reconnect) - return - - def closeEvent(self, event): - """Clean up on window close""" - self.test_running = False - self.measuring = False - self.session_active = False - - # Stop measurement thread - if hasattr(self, 'measurement_thread'): - self.measurement_thread.stop() - - # Stop test sequence thread + def cleanup_test_threads(self): + """Clean up any existing test threads before starting a new test""" + # Stop and clean up test sequence thread if it exists if hasattr(self, 'test_sequence_thread'): - if hasattr(self, 'test_sequence_worker'): - self.test_sequence_worker.stop() - self.test_sequence_thread.quit() - self.test_sequence_thread.wait(500) - - # Stop discharge thread - if hasattr(self, 'discharge_thread'): - if hasattr(self, 'discharge_worker'): - self.discharge_worker.stop() - self.discharge_thread.quit() - self.discharge_thread.wait(500) - - # Clean up device session - if hasattr(self, 'session') and self.session: try: - self.session.end() + if hasattr(self, 'test_sequence_worker'): + self.test_sequence_worker.stop() + self.test_sequence_thread.quit() + self.test_sequence_thread.wait(500) except Exception as e: - print(f"Error ending session: {e}") + print(f"Error cleaning up test sequence thread: {e}") + finally: + if hasattr(self, 'test_sequence_worker'): + try: + self.test_sequence_worker.deleteLater() + except: + pass + if hasattr(self, 'test_sequence_thread'): + try: + self.test_sequence_thread.deleteLater() + except: + pass - event.accept() + # Stop and clean up discharge thread if it exists + if hasattr(self, 'discharge_thread'): + try: + if hasattr(self, 'discharge_worker'): + self.discharge_worker.stop() + self.discharge_thread.quit() + self.discharge_thread.wait(500) + except Exception as e: + print(f"Error cleaning up discharge thread: {e}") + finally: + if hasattr(self, 'discharge_worker'): + try: + self.discharge_worker.deleteLater() + except: + pass + if hasattr(self, 'discharge_thread'): + try: + self.discharge_thread.deleteLater() + except: + pass + + # Stop and clean up charge thread if it exists + if hasattr(self, 'charge_thread'): + try: + if hasattr(self, 'charge_worker'): + self.charge_worker.stop() + self.charge_thread.quit() + self.charge_thread.wait(500) + except Exception as e: + print(f"Error cleaning up charge thread: {e}") + finally: + if hasattr(self, 'charge_worker'): + try: + self.charge_worker.deleteLater() + except: + pass + if hasattr(self, 'charge_thread'): + try: + self.charge_thread.deleteLater() + except: + pass + + def reconnect_device(self): + """Comprehensive device reconnection handler""" + try: + self.handle_device_connection(False, "Reconnecting...") + + # 1. Clean up existing connections + if hasattr(self, 'measurement_thread') and self.measurement_thread: + try: + self.measurement_thread.stop() + if not self.measurement_thread.wait(500): + self.measurement_thread.terminate() + except Exception as e: + print(f"Error stopping measurement thread: {e}") + + if hasattr(self, 'session') and self.session: + try: + self.session.end() + except Exception as e: + print(f"Error ending session: {e}") + + # 2. Initialize new session + self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) + self.session.scan() + + if not self.session.devices: + self.handle_device_connection(False, "No devices detected") + return + + self.session.start(0) + + # 3. Re-establish device connection + if hasattr(self, 'active_device') and self.active_device: + # Try to find the same device by serial + target_serial = self.active_device.serial + for dev in self.session.devices: + if dev.serial == target_serial: + # Recreate the DeviceManager + self.active_device = DeviceManager(dev) + self.devices[target_serial] = self.active_device + break + else: + # No previous device, just use first available + dev = self.session.devices[0] + self.active_device = DeviceManager(dev) + self.devices[dev.serial] = self.active_device + + # 4. Restart measurement system + self.active_device.start_measurement(self.interval) + + # Reconnect signals + self.measurement_thread = self.active_device.measurement_thread + self.measurement_thread.update_signal.connect(self.update_measurements) + self.measurement_thread.error_signal.connect(self.handle_device_error) + + # 5. Update UI + self.handle_device_connection(True, f"Reconnected: {self.active_device.serial}") + self.toggle_button.setEnabled(True) + + # Update device dropdown + self.device_combo.clear() + for serial in self.devices: + self.device_combo.addItem(serial) + self.device_combo.setCurrentText(self.active_device.serial) + + except Exception as e: + self.handle_device_connection(False, f"Reconnect failed: {str(e)}") if __name__ == "__main__": app = QApplication([])