# -*- coding: utf-8 -*- import os import time import csv import threading import traceback from datetime import datetime import numpy as np import matplotlib import subprocess matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure from collections import deque from queue import Queue, Full, Empty from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog, QComboBox) from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread from PyQt5.QtGui import QDoubleValidator from PyQt5 import sip import pysmu from pysmu import Session class DeviceManager: def __init__(self, dev): self.dev = dev self.serial = dev.serial self.measurement_thread = None self.is_running = False self.is_recording = False self.log_file = None self.log_writer = None self._last_log_time = 0 self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) self.session = None # Datenpuffer max_data_points = 36000 self.time_data = deque(maxlen=max_data_points) self.voltage_data = deque(maxlen=max_data_points) self.current_data = deque(maxlen=max_data_points) self.display_time_data = deque(maxlen=10000) self.display_voltage_data = deque(maxlen=10000) self.display_current_data = deque(maxlen=10000) # Testzustand self.capacity_ah = 0.0 self.energy = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 self.test_phase = "Idle" self.start_time = time.time() self.plot_mutex = threading.Lock() # Logging self.current_cycle_file = None self.log_writer = None # Downsampling self.downsample_factor = 1 self.aggregation_buffer = { 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0 } self.consecutive_read_errors = 0 # Track read failures self.max_consecutive_errors = 5 # Threshold before reset self.status_colors = { "connected": "green", "disconnected": "red", "error": "orange", } def handle_read_error(self, increment=1): """Enhanced device recovery with proper USB resource handling""" self.consecutive_read_errors += increment if self.consecutive_read_errors >= self.max_consecutive_errors: try: print(f"Attempting device recovery (errors: {self.consecutive_read_errors})...") # 1. First try soft reset try: if hasattr(self.dev, 'reset'): print("Attempting soft reset...") self.dev.reset() time.sleep(1.5) # Increased delay for reset to complete # Verify reset worked try: samples = self.dev.read(1, 500, True) if samples: self.consecutive_read_errors = 0 print("Soft reset successful") return True except Exception as e: print(f"Verification after soft reset failed: {e}") except Exception as e: print(f"Soft reset failed: {e}") # 2. Full reinitialization with USB reset print("Attempting full USB reset...") try: # Close existing session cleanly if hasattr(self, 'session') and self.session: try: print("Ending existing session...") self.session.end() time.sleep(0.5) except Exception as e: print(f"Error ending session: {e}") # Add delay and attempt USB reset time.sleep(2.0) # Longer delay for USB to settle # Try to release USB resources using libusb try: import usb.core # Find all ADALM1000 devices devices = usb.core.find(find_all=True, idVendor=0x064b, idProduct=0x784c) for dev in devices: try: print(f"Releasing USB device: {dev}") usb.util.dispose_resources(dev) except Exception as e: print(f"Error releasing USB device: {e}") time.sleep(1.0) except ImportError: print("pyusb not available, skipping resource release") # Try multiple times to scan for devices new_session = None for attempt in range(3): try: print(f"Scan attempt {attempt + 1}/3...") new_session = pysmu.Session(ignore_dataflow=True, queue_size=10000) devices = new_session.scan() if not devices: print(f"No devices found on attempt {attempt+1}/3") time.sleep(1.0) continue # Find our device by serial new_dev = None for d in devices: if d.serial == self.serial: new_dev = d break if new_dev: print(f"Found device {self.serial} on attempt {attempt+1}") self.dev = new_dev self.consecutive_read_errors = 0 self.session = new_session # Update session reference # Restart measurement thread if hasattr(self, 'measurement_thread'): try: self.measurement_thread.stop() time.sleep(0.5) except: pass self.start_measurement(self.interval) print("Device reinitialized successfully") return True else: print(f"Original device not found on attempt {attempt+1}") except Exception as e: print(f"Scan attempt {attempt+1} failed: {e}") time.sleep(1.0) print("Failed to find original device after 3 attempts") return False except Exception as e: print(f"Full reinitialization failed: {e}") # Continue to final fallback even if this fails pass except Exception as e: print(f"Device recovery failed: {e}") # Continue to final fallback pass # 3. Final fallback - try USB reset command try: print("Attempting USB port reset...") # ADALM1000 USB IDs vendor_id = "064b" product_id = "784c" try: # Linux-specific reset result = subprocess.run(['usbreset', f'{vendor_id}:{product_id}'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: print("USB reset command executed successfully, waiting 3 seconds...") time.sleep(3.0) return self.handle_read_error(0) # Retry with counter reset else: print(f"USB reset command failed with code {result.returncode}") print(f"Error output: {result.stderr}") except FileNotFoundError: # Windows alternative try: print("usbreset not found, trying Windows USB reset") # Use devcon utility for Windows result = subprocess.run(['devcon', 'restart', f'USB\VID_{vendor_id}&PID_{product_id}'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: print("USB device reset via devcon, waiting 3 seconds...") time.sleep(3.0) return self.handle_read_error(0) else: print(f"devcon reset failed with code {result.returncode}") print(f"Error output: {result.stderr}") except FileNotFoundError: # Python-based reset as final fallback print("devcon not found, attempting Python USB reset") try: # Try to import USB module try: import usb.core except ImportError: print("pyusb not installed, please install with: pip install pyusb") return False # Find device by vendor and product ID dev = usb.core.find(idVendor=0x064b, idProduct=0x784c) if dev is not None: try: # Reset the USB device dev.reset() print("USB device reset via pyusb, waiting 3 seconds...") time.sleep(3.0) return self.handle_read_error(0) except usb.core.USBError as e: print(f"USB reset error: {e}") else: print("USB device not found for reset") except Exception as e: print(f"Python USB reset failed: {e}") except Exception as e: print(f"USB reset command failed: {e}") import traceback traceback.print_exc() return True # Not enough errors yet to trigger recovery def start_measurement(self, interval=0.1): self.stop_measurement() # Ensure any existing thread is stopped self.measurement_thread = MeasurementThread(self.dev, interval, self) self.measurement_thread.start() self.is_running = True def stop_measurement(self): if self.measurement_thread: try: # Disconnect signals first try: self.measurement_thread.update_signal.disconnect() self.measurement_thread.error_signal.disconnect() except (RuntimeError, TypeError): pass # Then stop the thread self.measurement_thread.stop() if not self.measurement_thread.wait(500): self.measurement_thread.terminate() self.measurement_thread = None except Exception as e: print(f"Error stopping measurement thread: {e}") self.is_running = False def reset_data(self): self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.display_time_data.clear() self.display_voltage_data.clear() self.display_current_data.clear() self.aggregation_buffer = { 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0 } self.capacity_ah = 0.0 self.energy = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 self.start_time = time.time() self.test_phase = "Idle" class DeviceDisconnectedError(Exception): pass class MeasurementThread(QThread): update_signal = pyqtSignal(float, float, float) error_signal = pyqtSignal(str) def __init__(self, device, interval, parent_manager): super().__init__() self.device = device self.interval = interval self._running = False self.filter_window_size = 10 self.voltage_window = [] self.current_window = [] self.start_time = None self.measurement_queue = Queue(maxsize=1) self.current_direction = 1 self.parent_manager = parent_manager def stop(self): self._running = False try: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) except Exception as e: print(f"Error stopping device: {e}") self.wait(500) # Wait for thread to finish def is_measuring(self): return self.measurement_thread is not None and self.measurement_thread.isRunning() def run(self): """Measurement loop with enhanced recovery""" self._running = True self.start_time = time.time() while self._running: try: samples = self.device.read(self.filter_window_size, 500, True) # --- Handle empty samples --- if not samples: self.parent_manager.consecutive_read_errors += 1 if self.parent_manager.consecutive_read_errors >= self.parent_manager.max_consecutive_errors: # Attempt device reset through parent manager if hasattr(self, 'parent_manager'): if not self.parent_manager.handle_read_error(): raise DeviceDisconnectedError("Persistent read failures") time.sleep(0.1) continue # ✅ Reset error counter on successful read self.parent_manager.consecutive_read_errors = 0 # --- Process samples --- current_time = time.time() - self.start_time # Get voltage from Channel B (HI_Z mode) and current from Channel A raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage raw_current = np.mean([s[0][1] for s in samples]) * self.current_direction # Channel A current with direction # Update filter windows self.voltage_window.append(raw_voltage) self.current_window.append(raw_current) if len(self.voltage_window) > self.filter_window_size: self.voltage_window.pop(0) self.current_window.pop(0) voltage = np.mean(self.voltage_window) current = np.mean(self.current_window) # Validate measurements if not (-0.2 <= voltage <= 5.0): raise ValueError(f"Voltage out of range: {voltage:.4f}V") if not (-0.25 <= current <= 0.25): raise ValueError(f"Current out of range: {current:.4f}A") # Emit update self.update_signal.emit(voltage, current, current_time) # Store measurement try: self.measurement_queue.put_nowait((voltage, current)) except Full: pass time.sleep(max(0.05, self.interval)) except DeviceDisconnectedError as e: self.error_signal.emit(f"Device disconnected: {str(e)}") if not self.parent_manager.handle_read_error(): break time.sleep(1) except Exception as e: self.error_signal.emit(f"Read error: {str(e)}") if not self.parent_manager.handle_read_error(): break time.sleep(1) def set_direction(self, direction): """Set current direction (1 for source, -1 for sink)""" self.current_direction = direction class TestSequenceWorker(QObject): finished = pyqtSignal() update_phase = pyqtSignal(str) update_status = pyqtSignal(str) test_completed = pyqtSignal() error_occurred = pyqtSignal(str) def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent): super().__init__() self.device = device self.test_current = test_current self.charge_cutoff = charge_cutoff self.discharge_cutoff = discharge_cutoff self.rest_time = rest_time * 3600 # Convert hours to seconds self.continuous_mode = continuous_mode self.parent = parent self._running = True self.voltage_timeout = 0.5 # seconds def get_latest_measurement(self): """Thread-safe measurement reading with timeout""" try: return self.parent.measurement_thread.measurement_queue.get( timeout=self.voltage_timeout ) except Empty: return (None, None) # Return tuple for unpacking def charge_phase(self): """Handle the battery charging phase""" self.update_phase.emit("Charge") self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.4f}A") try: # Configure channels - Channel A sources current, Channel B measures voltage self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV self.device.channels['A'].constant(self.test_current) self.parent.measurement_thread.set_direction(1) # Source current # Small delay to allow current to stabilize time.sleep(0.1) while self._running: voltage, current = self.get_latest_measurement() if voltage is None: continue # Update parent's data for logging/display with self.parent.plot_mutex: if len(self.parent.voltage_data) > 0: self.parent.voltage_data[-1] = voltage self.parent.current_data[-1] = current if voltage >= self.charge_cutoff: break time.sleep(0.1) finally: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) def discharge_phase(self): """Handle the battery discharging phase""" voltage, _ = self.get_latest_measurement() if voltage is not None and voltage <= self.discharge_cutoff: self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)") return self.update_phase.emit("Discharge") self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A") try: # Configure channels - Channel A sinks current, Channel B measures voltage self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV self.device.channels['A'].constant(-self.test_current) self.parent.measurement_thread.set_direction(-1) # Sink current # Small delay to allow current to stabilize time.sleep(0.1) while self._running: voltage, current = self.get_latest_measurement() if voltage is None: continue # Update parent's data for logging/display with self.parent.plot_mutex: if len(self.parent.voltage_data) > 0: self.parent.voltage_data[-1] = voltage self.parent.current_data[-1] = current if voltage <= self.discharge_cutoff: break time.sleep(0.1) finally: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) def rest_phase(self, phase_name): """Handle rest period between phases""" self.update_phase.emit(f"Resting ({phase_name})") rest_end = time.time() + self.rest_time while time.time() < rest_end and self._running: time_left = max(0, rest_end - time.time()) self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min") time.sleep(1) def stop(self): self._running = False try: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) except Exception as e: print(f"Error stopping device: {e}") def run(self): """Main test sequence loop""" try: first_cycle = True # Ensure at least one cycle runs while (self._running and (self.parent.continuous_mode_check.isChecked() or first_cycle)): self.parent.request_stop = False self.parent.cycle_count += 1 first_cycle = False # Only True for the first cycle # 1. Charge phase (constant current) self.charge_phase() if not self._running or self.parent.request_stop: break # 2. Rest period after charge self.rest_phase("Post-Charge") if not self._running or self.parent.request_stop: break # 3. Discharge phase (capacity measurement) self.discharge_phase() if not self._running or self.parent.request_stop: break # 4. Rest period after discharge (only if not stopping) if self._running and not self.parent.request_stop: self.rest_phase("Post-Discharge") # Calculate Coulomb efficiency if not stopping if not self.parent.request_stop and self.parent.charge_capacity > 0: self.parent.coulomb_efficiency = ( self.parent.capacity_ah / self.parent.charge_capacity ) * 100 # Test completed self.test_completed.emit() except Exception as e: self.error_occurred.emit(f"Test sequence error: {str(e)}") finally: self.finished.emit() class DischargeWorker(QObject): finished = pyqtSignal() update_status = pyqtSignal(str) test_completed = pyqtSignal() error_occurred = pyqtSignal(str) def __init__(self, device, test_current, discharge_cutoff, parent): super().__init__() self.device = device self.test_current = test_current self.discharge_cutoff = discharge_cutoff self.parent = parent self._running = True self.voltage_timeout = 0.5 # seconds def get_latest_measurement(self): """Thread-safe measurement reading with timeout""" try: return self.parent.measurement_thread.measurement_queue.get( timeout=self.voltage_timeout ) except Empty: return (None, None) # Return tuple for unpacking def discharge_phase(self): """Handle the battery discharging phase""" voltage, _ = self.get_latest_measurement() if voltage is not None and voltage <= self.discharge_cutoff: self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)") return self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A") try: # Configure channels - Channel A sinks current, Channel B measures voltage self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV self.device.channels['A'].constant(-self.test_current) self.parent.measurement_thread.set_direction(-1) # Sink current # Small delay to allow current to stabilize time.sleep(0.1) while self._running: voltage, current = self.get_latest_measurement() if voltage is None: continue # Update parent's data for logging/display if self.parent.active_device: with self.parent.active_device.plot_mutex: if self.parent.active_device.voltage_data: self.parent.active_device.voltage_data[-1] = voltage self.parent.active_device.current_data[-1] = current if voltage <= self.discharge_cutoff: break time.sleep(0.1) finally: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) def stop(self): self._running = False try: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) except Exception as e: print(f"Error stopping device: {e}") def run(self): """Main discharge sequence""" try: self.parent.request_stop = False self.parent.cycle_count = 1 # Only one discharge cycle # Discharge phase self.discharge_phase() if not self._running or self.parent.request_stop: return # Test completed self.test_completed.emit() except Exception as e: self.error_occurred.emit(f"Discharge error: {str(e)}") finally: self.finished.emit() class ChargeWorker(QObject): finished = pyqtSignal() update_status = pyqtSignal(str) test_completed = pyqtSignal() error_occurred = pyqtSignal(str) def __init__(self, device, test_current, charge_cutoff, parent): super().__init__() self.device = device self.test_current = test_current self.charge_cutoff = charge_cutoff self.parent = parent self._running = True def run(self): """Main charge sequence""" try: self.parent.measurement_thread.set_direction(1) # Source current # Configure channels - Channel A sources current, Channel B measures voltage self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV self.device.channels['A'].constant(self.test_current) time.sleep(0.1) # Allow current to stabilize while self._running: voltage, current = self.parent.get_latest_measurement() if voltage is None: continue # Update parent's data for logging/display if self.parent.active_device: with self.parent.active_device.plot_mutex: if self.parent.active_device.voltage_data: self.parent.active_device.voltage_data[-1] = voltage self.parent.active_device.current_data[-1] = current if voltage >= self.charge_cutoff: break time.sleep(0.1) self.test_completed.emit() except Exception as e: self.error_occurred.emit(f"Charge error: {str(e)}") finally: self.device.channels['A'].constant(0) self.finished.emit() def stop(self): self._running = False try: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) except Exception as e: print(f"Error stopping device: {e}") class BatteryTester(QMainWindow): def __init__(self): self.plot_mutex = threading.Lock() super().__init__() self.devices = {} self.active_device = None self.last_logged_phase = None self.global_recording = False # Color scheme - MUST BE DEFINED FIRST self.bg_color = "#2E3440" self.fg_color = "#D8DEE9" self.accent_color = "#5E81AC" self.warning_color = "#BF616A" self.success_color = "#A3BE8C" # Status colors - MUST BE DEFINED BEFORE init_device() self.status_colors = { "connected": "green", "disconnected": "red", "error": "orange", "active": self.accent_color, "warning": self.warning_color } # Device and measurement state self.session_active = False self.measuring = False self.test_running = False self.continuous_mode = False self.request_stop = False self.interval = 0.1 self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) # Data buffers self.aggregation_buffer = { 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0 } self.phase_data = deque() self.downsample_factor = 1 # Initial kein Downsampling self.downsample_counter = 0 # Initialize all measurement variables self.capacity_ah = 0.0 self.energy = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 self.start_time = time.time() self.last_update_time = self.start_time self.capacity = 1.0 self.c_rate = 0.1 self.charge_cutoff = 1.43 self.discharge_cutoff = 0.01 self.rest_time = 0.5 # Initialize UI and device self.setup_ui() self.init_device() self.current_mode = "Live Monitoring" self.change_mode(self.current_mode) # Set window properties self.setWindowTitle("ADALM1000 - Battery Tester (Multi-Mode)") self.resize(1000, 800) self.setMinimumSize(800, 700) # Status update timer self.status_timer = QTimer() self.status_timer.timeout.connect(self.update_status_and_plot) self.status_timer.start(1000) #every second def setup_ui(self): """Configure the user interface with all elements properly organized""" # Main widget and layout self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.main_layout = QVBoxLayout(self.central_widget) self.main_layout.setContentsMargins(8, 8, 8, 8) self.main_layout.setSpacing(8) # Base style for consistent sizing base_style = f""" font-size: 10pt; color: {self.fg_color}; """ # Mode and device selection frame mode_frame = QFrame() mode_frame.setFrameShape(QFrame.StyledPanel) mode_frame.setStyleSheet(f""" QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; padding: 5px; }} QLabel {base_style} """) mode_layout = QHBoxLayout(mode_frame) mode_layout.setContentsMargins(5, 2, 5, 2) # Test mode selection self.mode_label = QLabel("Test Mode:") mode_layout.addWidget(self.mode_label) self.mode_combo = QComboBox() self.mode_combo.addItems(["Live Monitoring", "Discharge Test", "Charge Test", "Cycle Test"]) self.mode_combo.setStyleSheet(f""" QComboBox {{ {base_style} background-color: #3B4252; border: 1px solid #4C566A; border-radius: 3px; padding: 2px; min-height: 24px; }} """) self.mode_combo.currentTextChanged.connect(self.change_mode) mode_layout.addWidget(self.mode_combo, 1) # Device selection self.device_label = QLabel("Device:") mode_layout.addWidget(self.device_label) self.device_combo = QComboBox() self.device_combo.setStyleSheet(f""" QComboBox {{ {base_style} background-color: #3B4252; border: 1px solid #4C566A; border-radius: 3px; padding: 2px; min-height: 24px; }} """) self.device_combo.currentIndexChanged.connect(self.change_device) mode_layout.addWidget(self.device_combo, 1) self.main_layout.addWidget(mode_frame) # Header area header_frame = QFrame() header_frame.setFrameShape(QFrame.NoFrame) header_layout = QHBoxLayout(header_frame) header_layout.setContentsMargins(0, 0, 0, 0) self.title_label = QLabel("ADALM1000 Battery Tester") self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};") header_layout.addWidget(self.title_label, 1) # Status indicator self.status_light = QLabel() self.status_light.setFixedSize(16, 16) self.status_light.setStyleSheet("background-color: red; border-radius: 8px;") header_layout.addWidget(self.status_light) self.connection_label = QLabel("Disconnected") self.connection_label.setStyleSheet(base_style) header_layout.addWidget(self.connection_label) # Reconnect button self.reconnect_btn = QPushButton("Reconnect") self.reconnect_btn.setStyleSheet(f""" {base_style} background-color: #4C566A; padding: 2px 8px; border-radius: 3px; min-height: 24px; """) self.reconnect_btn.clicked.connect(self.reconnect_device) header_layout.addWidget(self.reconnect_btn) self.main_layout.addWidget(header_frame) # Measurement display - 4 rows x 2 columns display_frame = QFrame() display_frame.setFrameShape(QFrame.StyledPanel) display_frame.setStyleSheet(f""" QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; padding: 3px; }} QLabel {base_style} """) display_layout = QGridLayout(display_frame) display_layout.setHorizontalSpacing(5) display_layout.setVerticalSpacing(2) display_layout.setContentsMargins(5, 3, 5, 3) # Measurement fields in exact order measurement_fields = [ ("Voltage", "V"), ("Current", "A"), ("Elapsed Time", "s"), ("Energy", "Wh"), ("Test Phase", None), ("Capacity", "Ah"), # None for no unit ("Cycle Count", None), ("Coulomb Eff.", "%") ] for i, (label, unit) in enumerate(measurement_fields): row = i // 2 col = (i % 2) * 2 # Container for each measurement with fixed height container = QWidget() container.setFixedHeight(24) # Fixed row height container_layout = QHBoxLayout(container) container_layout.setContentsMargins(2, 0, 2, 0) container_layout.setSpacing(2) # Minimal spacing between elements # Label (fixed width) lbl = QLabel(f"{label}:") lbl.setStyleSheet("min-width: 85px;") container_layout.addWidget(lbl) # Value field (fixed width) value_text = "0.000" if unit else ("Idle" if label == "Test Phase" else "0") value_lbl = QLabel(value_text) value_lbl.setAlignment(Qt.AlignRight) value_lbl.setStyleSheet(""" font-weight: bold; min-width: 65px; max-width: 65px; """) container_layout.addWidget(value_lbl) # Unit (only if exists) if unit: unit_lbl = QLabel(unit) unit_lbl.setStyleSheet("min-width: 20px;") container_layout.addWidget(unit_lbl) display_layout.addWidget(container, row, col) # Assign references widgets = [ display_layout.itemAtPosition(r, c).widget().layout().itemAt(1).widget() for r in range(4) for c in [0, 2] ] (self.voltage_label, self.current_label, self.time_label, self.energy_label, self.phase_label, self.capacity_label, self.cycle_label, self.efficiency_label) = widgets self.main_layout.addWidget(display_frame) # Control area controls_frame = QFrame() controls_frame.setFrameShape(QFrame.NoFrame) controls_layout = QHBoxLayout(controls_frame) controls_layout.setContentsMargins(0, 0, 0, 0) # Parameters frame self.params_frame = QFrame() self.params_frame.setFrameShape(QFrame.StyledPanel) self.params_frame.setStyleSheet(f""" QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }} QLabel {base_style} QLineEdit {{ {base_style} background-color: #3B4252; border: 1px solid #4C566A; border-radius: 3px; padding: 2px; min-height: 24px; }} """) self.params_layout = QGridLayout(self.params_frame) self.params_layout.setVerticalSpacing(3) self.params_layout.setHorizontalSpacing(5) self.params_layout.setContentsMargins(8, 5, 8, 5) # Add parameter inputs row = 0 # Battery Capacity self.capacity_label_1 = QLabel("Capacity (Ah):") self.params_layout.addWidget(self.capacity_label_1, row, 0) self.capacity_input = QLineEdit("1.0") self.capacity_input.setValidator(QDoubleValidator(0.001, 100, 3)) self.params_layout.addWidget(self.capacity_input, row, 1) row += 1 # C-Rate self.c_rate_label = QLabel("C-Rate:") self.params_layout.addWidget(self.c_rate_label, row, 0) self.c_rate_input = QLineEdit("0.1") self.c_rate_input.setValidator(QDoubleValidator(0.01, 1, 2)) self.params_layout.addWidget(self.c_rate_input, row, 1) row += 1 # Charge Cutoff Voltage self.charge_cutoff_label = QLabel("Charge Cutoff (V):") self.params_layout.addWidget(self.charge_cutoff_label, row, 0) self.charge_cutoff_input = QLineEdit("1.43") self.charge_cutoff_input.setValidator(QDoubleValidator(0.1, 5.0, 3)) self.params_layout.addWidget(self.charge_cutoff_input, row, 1) row += 1 # Discharge Cutoff Voltage self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):") self.params_layout.addWidget(self.discharge_cutoff_label, row, 0) self.discharge_cutoff_input = QLineEdit("0.01") self.discharge_cutoff_input.setValidator(QDoubleValidator(0.1, 5.0, 3)) self.params_layout.addWidget(self.discharge_cutoff_input, row, 1) row += 1 # Rest Time self.rest_time_label = QLabel("Rest Time (h):") self.params_layout.addWidget(self.rest_time_label, row, 0) self.rest_time_input = QLineEdit("0.5") self.rest_time_input.setValidator(QDoubleValidator(0.1, 24, 1)) self.params_layout.addWidget(self.rest_time_input, row, 1) row += 1 # Test Conditions self.test_conditions_label = QLabel("Test Conditions:") self.params_layout.addWidget(self.test_conditions_label, row, 0) self.test_conditions_input = QLineEdit("Room Temperature") self.params_layout.addWidget(self.test_conditions_input, row, 1) controls_layout.addWidget(self.params_frame, 1) # Button frame button_frame = QFrame() button_frame.setFrameShape(QFrame.NoFrame) button_layout = QVBoxLayout(button_frame) button_layout.setContentsMargins(5, 0, 0, 0) button_layout.setSpacing(5) # Button style button_style = f""" QPushButton {{ {base_style} font-weight: bold; padding: 4px 8px; border-radius: 4px; min-height: 28px; color: {self.fg_color}; }} QPushButton:checked {{ background-color: {self.warning_color} !important; color: {self.fg_color} !important; }} QPushButton:disabled {{ background-color: #4C566A; }} """ # Single toggle button (Start/Stop) self.toggle_button = QPushButton("START") self.toggle_button.setCheckable(True) self.toggle_button.setStyleSheet(button_style + f""" background-color: {self.success_color}; min-width: 120px; """) self.toggle_button.clicked.connect(self.toggle_test) button_layout.addWidget(self.toggle_button) # Continuous mode checkbox (only for Cycle mode) self.continuous_mode_check = QCheckBox("Continuous Mode") self.continuous_mode_check.setChecked(True) self.continuous_mode_check.setStyleSheet(base_style) button_layout.addWidget(self.continuous_mode_check) self.continuous_mode_check.hide() # Record button for Live mode self.record_button = QPushButton("● Start Recording") self.record_button.setCheckable(True) self.record_button.setStyleSheet(button_style.replace( "background-color", "background-color", 1 ) + f"background-color: {self.success_color};") self.record_button.clicked.connect(self.toggle_global_recording) button_layout.addWidget(self.record_button) self.record_button.hide() controls_layout.addWidget(button_frame) self.main_layout.addWidget(controls_frame) # Plot area self.setup_plot() # Status bar self.status_bar = self.statusBar() self.status_bar.setStyleSheet(f"color: {self.fg_color}; font-size: 9pt;") self.status_bar.showMessage("Ready") # Apply dark theme self.setStyleSheet(f""" QMainWindow {{ background-color: {self.bg_color}; }} QWidget {{ {base_style} }} """) def apply_button_style(self): """Apply consistent button styling based on current state""" if self.toggle_button.isChecked(): # Stop state - red self.toggle_button.setStyleSheet(f""" QPushButton {{ background-color: {self.warning_color}; color: white; font-weight: bold; border: none; border-radius: 4px; padding: 4px 8px; min-height: 28px; }} QPushButton:hover {{ background-color: #{self.darker_color(self.warning_color)}; }} """) else: # Start state - green self.toggle_button.setStyleSheet(f""" QPushButton {{ background-color: {self.success_color}; color: {self.fg_color}; font-weight: bold; border: none; border-radius: 4px; padding: 4px 8px; min-height: 28px; }} QPushButton:hover {{ background-color: #{self.darker_color(self.success_color)}; }} """) # Update record button separately if self.record_button.isChecked(): self.record_button.setStyleSheet(f""" QPushButton {{ background-color: {self.warning_color}; color: white; font-weight: bold; border: none; border-radius: 4px; padding: 4px 8px; min-height: 28px; }} """) else: self.record_button.setStyleSheet(f""" QPushButton {{ background-color: {self.success_color}; color: {self.fg_color}; font-weight: bold; border: none; border-radius: 4px; padding: 4px 8px; min-height: 28px; }} """) def darker_color(self, hex_color): """Helper to generate a darker shade for hover effects""" if not hex_color.startswith('#'): hex_color = '#' + hex_color rgb = [int(hex_color[i:i+2], 16) for i in (1, 3, 5)] darker = [max(0, c - 40) for c in rgb] return ''.join([f"{c:02x}" for c in darker]) def toggle_global_recording(self): """Toggle recording for all connected devices simultaneously""" self.global_recording = not self.global_recording if self.global_recording: # Start recording for all devices for device in self.devices.values(): device.is_recording = True if not device.measurement_thread.isRunning(): device.start_measurement(self.interval) self.start_live_monitoring(device) self.record_button.setText("■ Stop Recording") self.status_bar.showMessage("Recording started for all connected devices") else: # Stop recording for all devices for device in self.devices.values(): self.finalize_device_log_file(device) device.is_recording = False self.record_button.setText("● Start Recording") self.status_bar.showMessage("Recording stopped for all devices") self.apply_button_style() def safe_execute(func): """Decorator to catch and log exceptions in Qt event handlers""" def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: print(f"Error in {func.__name__}: {str(e)}") traceback.print_exc() return wrapper @safe_execute def toggle_test(self, checked): if checked: self.start_test() else: self.stop_test() self.apply_button_style() def change_mode(self, mode_name): """Change between different test modes""" self.current_mode = mode_name self.stop_test() # Stop any current operation # Show/hide mode-specific UI elements show_charge = mode_name in ["Cycle Test", "Charge Test"] show_discharge = mode_name in ["Cycle Test", "Discharge Test"] show_rest = mode_name == "Cycle Test" self.charge_cutoff_label.setVisible(show_charge) self.charge_cutoff_input.setVisible(show_charge) self.discharge_cutoff_label.setVisible(show_discharge) self.discharge_cutoff_input.setVisible(show_discharge) self.rest_time_label.setVisible(show_rest) self.rest_time_input.setVisible(show_rest) # Continuous mode checkbox only for cycle test self.continuous_mode_check.setVisible(mode_name == "Cycle Test") # Record button only for live monitoring self.record_button.setVisible(mode_name == "Live Monitoring") # Set button text based on mode and make sure it's visible for test modes if mode_name == "Cycle Test": self.toggle_button.setText("START CYCLE TEST") self.toggle_button.show() elif mode_name == "Discharge Test": self.toggle_button.setText("START DISCHARGE") self.toggle_button.show() elif mode_name == "Charge Test": self.toggle_button.setText("START CHARGE") self.toggle_button.show() elif mode_name == "Live Monitoring": self.toggle_button.hide() # Only hide for Live Monitoring # Reset button state self.toggle_button.setChecked(False) self.toggle_button.setEnabled(True) self.apply_button_style() # Reset measurement state and zero the time if self.active_device: dev = self.active_device dev.reset_data() # Reset the measurement thread's start time if hasattr(dev, 'measurement_thread'): dev.measurement_thread.start_time = time.time() # Reset UI displays self.capacity_label.setText("0.0000") self.energy_label.setText("0.0000") self.cycle_label.setText("0") self.phase_label.setText("Idle") #self.time_label.setText("00:00:00") # Reset plot self.reset_plot() self.status_bar.showMessage(f"Mode changed to {mode_name}") # Update recording button if mode_name == "Live Monitoring": self.record_button.setVisible(True) if self.active_device and self.active_device.is_recording: self.record_button.setChecked(True) self.record_button.setText("Stop Recording") else: self.record_button.setChecked(False) self.record_button.setText("Start Recording") else: self.record_button.setVisible(False) self.apply_button_style() self.status_bar.showMessage(f"Mode changed to {mode_name}") def reset_test(self): if not self.active_device: return dev_manager = self.active_device dev_manager.reset_data() # Reset in DeviceManager # UI zurücksetzen self.capacity_label.setText("0.0000") self.energy_label.setText("0.0000") self.cycle_label.setText("0") self.phase_label.setText("Idle") def toggle_recording(self): """Toggle data recording in Live Monitoring mode""" if not self.active_device: return dev = self.active_device if not dev.is_recording: # Use device's recording state try: if self.create_cycle_log_file(): dev.is_recording = True # Set device's recording state self.record_button.setText("Stop Recording") self.apply_button_style() # Update style self.status_bar.showMessage("Live recording started") # Ensure monitoring is running if not self.test_running: self.start_live_monitoring(self.active_device) else: self.record_button.setChecked(False) self.current_cycle_file = None except Exception as e: print(f"Error starting recording: {e}") self.record_button.setChecked(False) self.current_cycle_file = None QMessageBox.critical(self, "Error", f"Failed to start recording:\n{str(e)}") else: # Stop recording try: if hasattr(self, 'current_cycle_file') and self.current_cycle_file: self.finalize_log_file() dev.is_recording = False # Clear device's recording state self.record_button.setText("Start Recording") self.apply_button_style() # Update style self.status_bar.showMessage("Live recording stopped") except Exception as e: print(f"Error stopping recording: {e}") def handle_continuous_mode_change(self, state): """Handle changes to continuous mode checkbox during operation""" if not state and self.test_running: # If unchecked during test self.status_bar.showMessage("Continuous mode disabled - will complete current cycle") self.continuous_mode_check.setStyleSheet(f"color: {self.warning_color};") QTimer.singleShot(2000, lambda: self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")) def setup_plot(self): """Configure the matplotlib plot""" self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color) self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) self.ax = self.fig.add_subplot(111) self.ax.set_facecolor('#3B4252') # Set initial voltage range voltage_padding = 0.2 min_voltage = max(0, 0.9 - voltage_padding) max_voltage = 1.43 + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) # Voltage plot self.line_voltage, = self.ax.plot([0], [0], color='#00BFFF', label='Voltage (V)', linewidth=2) self.ax.set_ylabel("Voltage (V)", color='#00BFFF') self.ax.tick_params(axis='y', labelcolor='#00BFFF') # Current plot (right axis) self.ax2 = self.ax.twinx() current_padding = 0.05 test_current = 0.1 * 0.2 # Default values max_current = test_current * 1.5 self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) self.line_current, = self.ax2.plot([0], [0], 'r-', label='Current (A)', linewidth=2) self.ax2.set_ylabel("Current (A)", color='r') self.ax2.tick_params(axis='y', labelcolor='r') self.ax.set_xlabel('Time (s)', color=self.fg_color) self.ax.set_title('Battery Test', color=self.fg_color) self.ax.tick_params(axis='x', colors=self.fg_color) self.ax.grid(True, color='#4C566A') # Position legends self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) # Embed plot self.canvas = FigureCanvas(self.fig) self.canvas.setStyleSheet(f"background-color: {self.bg_color};") self.main_layout.addWidget(self.canvas, 1) def init_device(self): """Robust device initialization""" try: # Close existing session if hasattr(self, 'session'): try: self.session.end() except: pass # Create new session self.session = Session(ignore_dataflow=True, queue_size=10000) self.session.scan() # Retry mechanism retry_count = 0 while not self.session.devices and retry_count < 3: self.handle_device_connection(False, f"Scanning... (Attempt {retry_count+1}/3)") QApplication.processEvents() time.sleep(1) self.session.scan() retry_count += 1 if not self.session.devices: self.handle_device_connection(False, "No devices found") return self.devices = {} self.active_device = None for dev in self.session.devices: manager = DeviceManager(dev) manager.start_measurement(interval=self.interval) self.devices[dev.serial] = manager # Connect signals for all devices manager.measurement_thread.update_signal.connect(self.update_measurements) manager.measurement_thread.error_signal.connect(self.handle_device_error) # Select first device first_serial = next(iter(self.devices.keys())) self.active_device = self.devices[first_serial] # Update UI self.device_combo.clear() for serial in self.devices: self.device_combo.addItem(serial) self.device_combo.setCurrentText(first_serial) self.session_active = True self.handle_device_connection(True, f"Connected: {first_serial}") self.toggle_button.setEnabled(True) # Connect measurement signals self.measurement_thread = self.active_device.measurement_thread self.measurement_thread.update_signal.connect(self.update_measurements) self.measurement_thread.error_signal.connect(self.handle_device_error) # Initialize recording state self.global_recording = False self.record_button.setChecked(False) self.record_button.setText("Start Recording") except Exception as e: self.handle_device_connection(False, f"Initialization failed: {str(e)}") def handle_no_devices(self): """Handle case when no devices are found""" self.session_active = False self.active_device = None self.status_bar.showMessage("No ADALM1000 devices found") self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") self.toggle_button.setEnabled(False) self.device_combo.clear() # Show reconnect button self.reconnect_btn.setEnabled(True) self.reconnect_btn.setVisible(True) def handle_device_connection(self, connected, message=None): """Update connection status with proper coloring""" if connected: status = "connected" if not message: message = "Connected" else: status = "error" if "fail" in message.lower() else "disconnected" if not message: message = "Disconnected" color = self.status_colors.get(status, "orange") self.connection_label.setText(message) self.status_light.setStyleSheet(f""" background-color: {color}; border-radius: 10px; min-width: 12px; max-width: 12px; min-height: 12px; max-height: 12px; """) QApplication.processEvents() def check_connection(self): """Periodically verify device connection""" if not hasattr(self, 'session') or not self.session.devices: self.handle_device_error("Device disconnected") self.reconnect_device() def request_usb_permissions(self): """Handle USB permission issues with user interaction""" msg = QMessageBox(self) msg.setIcon(QMessageBox.Critical) msg.setWindowTitle("USB Permission Required") msg.setText("Permission needed to access ADALM1000 devices") msg.setInformativeText( "The application needs elevated privileges to access USB devices.\n\n" "Please choose an option:" ) # Add buttons sudo_button = msg.addButton("Run as Administrator", QMessageBox.ActionRole) udev_button = msg.addButton("Fix Permissions", QMessageBox.ActionRole) cancel_button = msg.addButton(QMessageBox.Cancel) msg.exec_() if msg.clickedButton() == sudo_button: # Restart with sudo QMessageBox.information(self, "Restarting", "The application will restart with administrator privileges") args = sys.argv[:] args.insert(0, sys.executable) os.execvp("sudo", ["sudo"] + args) elif msg.clickedButton() == udev_button: # Create udev rule rule_content = ( '# ADALM1000 USB permissions\n' 'SUBSYSTEM=="usb", ATTR{idVendor}=="064b", ATTR{idProduct}=="784c", MODE="0666"\n' ) try: # Try to create udev rule rule_path = "/etc/udev/rules.d/52-adalm1000.rules" with open(rule_path, "w") as f: f.write(rule_content) # Apply rules os.system("sudo udevadm control --reload-rules") os.system("sudo udevadm trigger") QMessageBox.information(self, "Permissions Fixed", "USB permissions configured. Please reconnect devices.") except Exception as e: QMessageBox.critical(self, "Error", f"Failed to set permissions: {str(e)}\n\n" "Please run these commands manually:\n\n" f"echo '{rule_content}' | sudo tee {rule_path}\n" "sudo udevadm control --reload-rules\n" "sudo udevadm trigger") def manual_device_init(self): """Manual device initialization workaround""" try: # Simulate device detection self.device_combo.clear() self.device_combo.addItem("ADALM1000-1 (Simulated)") self.device_combo.addItem("ADALM1000-2 (Simulated)") # Mock connection self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;") self.connection_label.setText("Simulated Devices") self.session_active = True self.toggle_button.setEnabled(True) QMessageBox.warning(self, "Simulation Mode", "Using simulated devices - real hardware not detected") except Exception as e: print(f"Manual init failed: {e}") def change_device(self, index): if not self.session_active or index < 0: return serial = self.device_combo.itemText(index) if serial not in self.devices: return # Store current device state before switching old_device = self.active_device # Activate new device self.active_device = self.devices[serial] # Reconnect signals for new device self.measurement_thread = self.active_device.measurement_thread self.measurement_thread.update_signal.connect(self.update_measurements) self.measurement_thread.error_signal.connect(self.handle_device_error) # Restart measurement if not running if not self.measurement_thread.isRunning(): self.active_device.start_measurement(self.interval) # Update UI with current device data self.update_ui_from_active_device() # Preserve recording state for old device if old_device and old_device.is_recording: # Ensure old device continues recording if not old_device.measurement_thread.isRunning(): old_device.start_measurement(self.interval) # Update recording button for new device self.record_button.setChecked(self.active_device.is_recording) self.record_button.setText("Stop Recording" if self.active_device.is_recording else "Start Recording") self.apply_button_style() self.status_bar.showMessage(f"Switched to device: {serial}") def update_ui_from_active_device(self): dev = self.active_device if not dev: return with self.plot_mutex: # Kopiere aktuelle Daten x = list(dev.display_time_data) y_v = list(dev.display_voltage_data) y_c = list(dev.display_current_data) # Aktualisiere Plot self.line_voltage.set_data(x, y_v) self.line_current.set_data(x, y_c) self.auto_scale_axes() self.canvas.draw_idle() # Aktualisiere Labels if dev.voltage_data: v = dev.voltage_data[-1] i = dev.current_data[-1] t = dev.time_data[-1] if dev.time_data else 0 self.voltage_label.setText(f"{v:.4f}") self.current_label.setText(f"{abs(i):.4f}") self.time_label.setText(self.format_time(t)) self.capacity_label.setText(f"{dev.capacity_ah:.4f}") self.energy_label.setText(f"{dev.energy:.4f}") self.cycle_label.setText(str(dev.cycle_count)) self.phase_label.setText(dev.test_phase) @safe_execute @pyqtSlot(str, float, float, float) def update_measurements(self, serial, voltage, current, current_time): """Update measurements for a specific device identified by serial""" # Get device from serial device = self.devices.get(serial) if not device: return # Update device data device.time_data.append(current_time) device.voltage_data.append(voltage) device.current_data.append(current) # Calculate metrics power = voltage * abs(current) if len(device.time_data) > 1: delta_t = device.time_data[-1] - device.time_data[-2] device.capacity_ah += abs(current) * delta_t / 3600 # Ah device.energy += power * delta_t / 3600 # Wh # Update display buffers device.display_time_data.append(current_time) device.display_voltage_data.append(voltage) device.display_current_data.append(current) # Update UI only if this is the active device if device == self.active_device: self.voltage_label.setText(f"{voltage:.4f}") self.current_label.setText(f"{abs(current):.4f}") self.capacity_label.setText(f"{device.capacity_ah:.4f}") self.energy_label.setText(f"{device.energy:.4f}") self.time_label.setText(self.format_time(current_time)) # Update plot if needed (only for active device) if time.time() - getattr(device, '_last_plot_update', 0) > 0.5: # Throttle updates device._last_plot_update = time.time() self.update_plot() # Handle recording for this device now = time.time() if device.is_recording and device.log_writer and device.time_data: if now - device._last_log_time >= 1.0: # Log at 1Hz try: device.log_writer.writerow([ f"{current_time:.2f}", f"{voltage:.6f}", f"{current:.6f}", f"{device.capacity_ah:.6f}", f"{power:.6f}", f"{device.energy:.6f}", device.test_phase ]) device.log_file.flush() device._last_log_time = now except Exception as e: print(f"Log write error for device {serial}: {e}") # Attempt to close and reopen log file try: if device.log_file: device.log_file.close() except: pass device.log_file = None device.log_writer = None device.is_recording = False # Update UI if this is the active device if device == self.active_device: self.record_button.setChecked(False) self.record_button.setText("Start Recording") self.apply_button_style() def adjust_downsampling(self): current_length = len(self.time_data) if current_length > self.max_points_to_keep * 1.5: # Exponentiell erhöhen, aber max. 64 new_factor = min(64, max(1, self.downsample_factor * 2)) elif current_length < self.max_points_to_keep // 2: # Halbieren, aber min. 1 new_factor = max(1, self.downsample_factor // 2) else: return if new_factor != self.downsample_factor: self.downsample_factor = new_factor self.status_bar.showMessage( f"Downsampling: Factor {self.downsample_factor}", 2000) def update_status_and_plot(self): """Combined status and plot update""" self.update_status() self.update_plot() def update_status(self): """Update status information periodically""" now = time.time() # Update all devices for dev_manager in self.devices.values(): # Only process if device has data if not dev_manager.time_data: continue # Update capacity calculation if len(dev_manager.time_data) > 1: idx = len(dev_manager.time_data) - 1 delta_t = dev_manager.time_data[idx] - dev_manager.time_data[idx-1] current_val = abs(dev_manager.current_data[idx]) power = dev_manager.voltage_data[idx] * current_val dev_manager.capacity_ah += current_val * delta_t / 3600 dev_manager.energy += power * delta_t / 3600 # Write to log if recording if dev_manager.is_recording and dev_manager.log_writer: try: if now - getattr(dev_manager, '_last_log_time', 0) >= 1.0: dev_manager.log_writer.writerow([ f"{dev_manager.time_data[-1]:.4f}", f"{dev_manager.voltage_data[-1]:.6f}", f"{dev_manager.current_data[-1]:.6f}", "Live", f"{dev_manager.capacity_ah:.4f}", f"{power:.4f}", f"{dev_manager.energy:.4f}" ]) dev_manager.log_file.flush() dev_manager._last_log_time = now except Exception as e: print(f"Log write error for device {dev_manager.serial}: {e}") dev_manager.is_recording = False # Update UI for active device if self.active_device and self.active_device.time_data: dev = self.active_device self.capacity_label.setText(f"{dev.capacity_ah:.4f}") self.energy_label.setText(f"{dev.energy:.4f}") # Update elapsed-time display if self.active_device and self.active_device.time_data: elapsed = self.active_device.time_data[-1] self.time_label.setText(self.format_time(elapsed)) @safe_execute def start_test(self): """Start the selected test mode using the active device""" if not self.active_device: QMessageBox.warning(self, "No Device", "No ADALM1000 device selected.") self.toggle_button.setChecked(False) self.apply_button_style() return dev_manager = self.active_device dev = dev_manager.dev # Clean up any previous test self.cleanup_test_threads() # Reset test state for active device self.reset_test() # Reset measurement thread timing dev_manager.measurement_thread.start_time = time.time() dev_manager.measurement_thread.voltage_window.clear() dev_manager.measurement_thread.current_window.clear() with dev_manager.measurement_thread.measurement_queue.mutex: dev_manager.measurement_thread.measurement_queue.queue.clear() self.time_label.setText("00:00:00") # Reset data buffers dev_manager.time_data.clear() dev_manager.voltage_data.clear() dev_manager.current_data.clear() dev_manager.display_time_data.clear() dev_manager.display_voltage_data.clear() dev_manager.display_current_data.clear() dev_manager.aggregation_buffer = { 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0 } # Reset device state try: dev.channels['A'].mode = pysmu.Mode.HI_Z dev.channels['A'].constant(0) dev.channels['B'].mode = pysmu.Mode.HI_Z dev.channels['B'].constant(0) except Exception as e: QMessageBox.critical(self, "Device Error", f"Failed to reset device: {e}") self.toggle_button.setChecked(False) self.apply_button_style() return # Reset test variables dev_manager.capacity_ah = 0.0 dev_manager.energy = 0.0 dev_manager.charge_capacity = 0.0 dev_manager.coulomb_efficiency = 0.0 dev_manager.cycle_count = 0 dev_manager.start_time = time.time() dev_manager.test_phase = "Running" # Set global state self.test_running = True self.request_stop = False # Update UI self.phase_label.setText(dev_manager.test_phase) self.toggle_button.setText("STOP") self.apply_button_style() # Get parameters from UI try: self.capacity = float(self.capacity_input.text()) self.c_rate = float(self.c_rate_input.text()) test_current = self.c_rate * self.capacity if test_current > 0.2: raise ValueError("Current must be ≤ 200mA (0.2A) for ADALM1000") except ValueError as e: QMessageBox.critical(self, "Input Error", str(e)) self.stop_test() return # Create log file - using start_live_monitoring instead of _start_device_recording if not self.start_live_monitoring(self.active_device): self.stop_test() return # Start the appropriate test if self.current_mode == "Cycle Test": try: self.charge_cutoff = float(self.charge_cutoff_input.text()) self.discharge_cutoff = float(self.discharge_cutoff_input.text()) self.rest_time = float(self.rest_time_input.text()) if self.charge_cutoff <= self.discharge_cutoff: raise ValueError("Charge cutoff must be higher than discharge cutoff") # Start test sequence self.test_sequence_thread = QThread() self.test_sequence_worker = TestSequenceWorker( dev, test_current, self.charge_cutoff, self.discharge_cutoff, self.rest_time, self.continuous_mode_check.isChecked(), self ) self.test_sequence_worker.moveToThread(self.test_sequence_thread) self.test_sequence_worker.update_phase.connect(self.update_test_phase) self.test_sequence_worker.update_status.connect(self.status_bar.showMessage) self.test_sequence_worker.test_completed.connect(self.finalize_test) self.test_sequence_worker.error_occurred.connect(self.handle_test_error) self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit) self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater) self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater) self.test_sequence_thread.start() QTimer.singleShot(0, self.test_sequence_worker.run) self.status_bar.showMessage(f"Cycle test started | Device: {dev.serial} | Current: {test_current:.4f}A") except Exception as e: QMessageBox.critical(self, "Error", str(e)) self.stop_test() elif self.current_mode == "Discharge Test": try: self.discharge_cutoff = float(self.discharge_cutoff_input.text()) self.discharge_thread = QThread() self.discharge_worker = DischargeWorker(dev, test_current, self.discharge_cutoff, self) self.discharge_worker.moveToThread(self.discharge_thread) self.discharge_worker.update_status.connect(self.status_bar.showMessage) self.discharge_worker.test_completed.connect(self.finalize_test) self.discharge_worker.error_occurred.connect(self.handle_test_error) self.discharge_worker.finished.connect(self.discharge_thread.quit) self.discharge_worker.finished.connect(self.discharge_worker.deleteLater) self.discharge_thread.finished.connect(self.discharge_thread.deleteLater) self.discharge_thread.start() QTimer.singleShot(0, self.discharge_worker.run) self.status_bar.showMessage(f"Discharge test started | Device: {dev.serial} | Current: {test_current:.4f}A") except Exception as e: QMessageBox.critical(self, "Error", str(e)) self.stop_test() elif self.current_mode == "Charge Test": try: self.charge_cutoff = float(self.charge_cutoff_input.text()) self.charge_thread = QThread() self.charge_worker = ChargeWorker(dev, test_current, self.charge_cutoff, self) self.charge_worker.moveToThread(self.charge_thread) self.charge_worker.update_status.connect(self.status_bar.showMessage) self.charge_worker.test_completed.connect(self.finalize_test) self.charge_worker.error_occurred.connect(self.handle_test_error) self.charge_worker.finished.connect(self.charge_thread.quit) self.charge_worker.finished.connect(self.charge_worker.deleteLater) self.charge_thread.finished.connect(self.charge_thread.deleteLater) self.charge_thread.start() QTimer.singleShot(0, self.charge_worker.run) self.status_bar.showMessage(f"Charge test started | Device: {dev.serial} | Current: {test_current:.4f}A") except Exception as e: QMessageBox.critical(self, "Error", str(e)) self.stop_test() elif self.current_mode == "Live Monitoring": # Ensure we pass the device parameter self.start_live_monitoring(self.active_device) def start_cycle_test(self): """Start the battery cycle test""" # Clean up any previous test if hasattr(self, 'test_sequence_worker'): try: self.test_sequence_worker.stop() except: pass self.test_sequence_worker.deleteLater() if hasattr(self, 'test_sequence_thread'): self.test_sequence_thread.quit() self.test_sequence_thread.wait() self.test_sequence_thread.deleteLater() del self.test_sequence_thread self.reset_test() self.reset_plot() if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() # Reset stop flag self.request_stop = False if not self.test_running: try: # Get parameters from UI self.capacity = float(self.capacity_input.text()) self.charge_cutoff = float(self.charge_cutoff_input.text()) self.discharge_cutoff = float(self.discharge_cutoff_input.text()) self.rest_time = float(self.rest_time_input.text()) self.c_rate = float(self.c_rate_input.text()) # Validate inputs if self.capacity <= 0: raise ValueError("Battery capacity must be positive") if self.charge_cutoff <= self.discharge_cutoff: raise ValueError("Charge cutoff must be higher than discharge cutoff") if self.c_rate <= 0: raise ValueError("C-rate must be positive") test_current = self.c_rate * self.capacity if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") # Clear ALL previous data completely with self.plot_mutex: self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.phase_data.clear() # Reset capacities and timing self.start_time = time.time() self.last_update_time = self.start_time self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 self.energy = 0.0 # Reset measurement thread's timer and queues if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() self.measurement_thread.voltage_window.clear() self.measurement_thread.current_window.clear() with self.measurement_thread.measurement_queue.mutex: self.measurement_thread.measurement_queue.queue.clear() # Reset plot completely self.reset_plot() # Start test self.test_running = True self.start_time = time.time() self.last_update_time = time.time() self.test_phase = "Initial Discharge" self.phase_label.setText(self.test_phase) self.toggle_button.setChecked(True) self.toggle_button.setText("STOP") self.apply_button_style() self.status_bar.showMessage(f"Cycle test started | Current: {test_current:.4f}A") # Create log file self.create_cycle_log_file() # Start test sequence in a QThread self.test_sequence_thread = QThread() self.test_sequence_worker = TestSequenceWorker( self.active_device.dev, test_current, self.charge_cutoff, self.discharge_cutoff, self.rest_time, self.continuous_mode_check.isChecked(), self # Pass reference to main window for callbacks ) self.test_sequence_worker.moveToThread(self.test_sequence_thread) # Connect signals self.test_sequence_worker.update_phase.connect(self.update_test_phase) self.test_sequence_worker.update_status.connect(self.status_bar.showMessage) self.test_sequence_worker.test_completed.connect(self.finalize_test) self.test_sequence_worker.error_occurred.connect(self.handle_test_error) self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit) self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater) self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater) # Start the thread and the worker's run method self.test_sequence_thread.start() QTimer.singleShot(0, self.test_sequence_worker.run) except Exception as e: QMessageBox.critical(self, "Error", str(e)) # Ensure buttons are in correct state if error occurs self.toggle_button.setChecked(False) self.toggle_button.setText("START") self.apply_button_style() self.toggle_button.setEnabled(True) def start_discharge_test(self): """Start the battery discharge test""" # Clean up any previous test self.reset_test() # löscht time_data, voltage_data, current_data, display_*, phase_data self.reset_plot() if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() if hasattr(self, 'discharge_worker'): try: self.discharge_worker.stop() except: pass self.discharge_worker.deleteLater() if hasattr(self, 'discharge_thread'): self.discharge_thread.quit() self.discharge_thread.wait() # warte unbegrenzt, bis er wirklich fertig ist self.discharge_thread.deleteLater() del self.discharge_thread # Reset stop flag self.request_stop = False if not self.test_running: try: # Get parameters from UI self.capacity = float(self.capacity_input.text()) self.discharge_cutoff = float(self.discharge_cutoff_input.text()) self.c_rate = float(self.c_rate_input.text()) # Validate inputs if self.capacity <= 0: raise ValueError("Battery capacity must be positive") if self.c_rate <= 0: raise ValueError("C-rate must be positive") test_current = self.c_rate * self.capacity if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") # Clear ALL previous data completely with self.plot_mutex: self.time_data.clear() self.voltage_data.clear() self.current_data.clear() # Reset capacities and timing self.start_time = time.time() self.last_update_time = self.start_time self.capacity_ah = 0.0 self.energy = 0.0 self.cycle_count = 1 # Reset measurement thread's timer and queues if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() self.measurement_thread.voltage_window.clear() self.measurement_thread.current_window.clear() with self.measurement_thread.measurement_queue.mutex: self.measurement_thread.measurement_queue.queue.clear() # Reset plot completely self.reset_plot() # Start test self.test_running = True self.start_time = time.time() self.last_update_time = time.time() self.test_phase = "Discharge" self.phase_label.setText(self.test_phase) self.toggle_button.setChecked(True) self.toggle_button.setText("STOP") self.apply_button_style() self.status_bar.showMessage(f"Discharge started | Current: {test_current:.4f}A") # Create log file self.create_cycle_log_file() # Start discharge worker in a QThread self.discharge_thread = QThread() self.discharge_worker = DischargeWorker( self.active_device.dev, test_current, self.discharge_cutoff, self # Pass reference to main window for callbacks ) self.discharge_worker.moveToThread(self.discharge_thread) # Connect signals self.discharge_worker.update_status.connect(self.status_bar.showMessage) self.discharge_worker.test_completed.connect(self.finalize_test) self.discharge_worker.error_occurred.connect(self.handle_test_error) self.discharge_worker.finished.connect(self.discharge_thread.quit) self.discharge_worker.finished.connect(self.discharge_worker.deleteLater) self.discharge_thread.finished.connect(self.discharge_thread.deleteLater) # Start the thread and the worker's run method self.discharge_thread.start() QTimer.singleShot(0, self.discharge_worker.run) except Exception as e: QMessageBox.critical(self, "Error", str(e)) # Ensure buttons are in correct state if error occurs self.toggle_button.setChecked(False) self.toggle_button.setText("START") self.apply_button_style() self.toggle_button.setEnabled(True) def start_charge_test(self): """Start the battery charge test""" # Clean up any previous test if hasattr(self, 'charge_worker'): try: self.charge_worker.stop() except: pass self.charge_worker.deleteLater() if hasattr(self, 'charge_thread'): self.charge_thread.quit() self.charge_thread.wait() self.charge_thread.deleteLater() del self.charge_thread self.reset_test() self.reset_plot() if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() # Reset stop flag self.request_stop = False if not self.test_running: try: # Get parameters from UI self.capacity = float(self.capacity_input.text()) self.charge_cutoff = float(self.charge_cutoff_input.text()) self.c_rate = float(self.c_rate_input.text()) # Validate inputs if self.capacity <= 0: raise ValueError("Battery capacity must be positive") if self.c_rate <= 0: raise ValueError("C-rate must be positive") test_current = self.c_rate * self.capacity if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") # Clear ALL previous data completely with self.plot_mutex: self.time_data.clear() self.voltage_data.clear() self.current_data.clear() # Reset capacities and timing self.start_time = time.time() self.last_update_time = self.start_time self.capacity_ah = 0.0 self.energy = 0.0 self.cycle_count = 1 # Reset measurement thread if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() self.measurement_thread.voltage_window.clear() self.measurement_thread.current_window.clear() with self.measurement_thread.measurement_queue.mutex: self.measurement_thread.measurement_queue.queue.clear() # Reset plot self.reset_plot() # Start test self.test_running = True self.start_time = time.time() self.last_update_time = time.time() self.test_phase = "Charge" self.phase_label.setText(self.test_phase) self.toggle_button.setChecked(True) self.toggle_button.setText("STOP") self.apply_button_style() self.status_bar.showMessage(f"Charge started @ {test_current:.3f}A to {self.charge_cutoff}V") # Create log file self.create_cycle_log_file() # Start charge worker in a QThread self.charge_thread = QThread() self.charge_worker = ChargeWorker( self.active_device.dev, test_current, self.charge_cutoff, self ) self.charge_worker.moveToThread(self.charge_thread) # Connect signals self.charge_worker.update_status.connect(self.status_bar.showMessage) self.charge_worker.test_completed.connect(self.finalize_test) self.charge_worker.error_occurred.connect(self.handle_test_error) self.charge_worker.finished.connect(self.charge_thread.quit) self.charge_worker.finished.connect(self.charge_worker.deleteLater) self.charge_thread.finished.connect(self.charge_thread.deleteLater) # Start the thread self.charge_thread.start() QTimer.singleShot(0, self.charge_worker.run) except Exception as e: QMessageBox.critical(self, "Error", str(e)) self.toggle_button.setChecked(False) self.toggle_button.setText("START") self.apply_button_style() self.toggle_button.setEnabled(True) def start_live_monitoring(self, device): """Initialize logging for a specific device (replaces create_device_log_file)""" try: if device.is_recording: return True # Already recording # Ensure log directory exists os.makedirs(device.log_dir, exist_ok=True) if not os.access(device.log_dir, os.W_OK): QMessageBox.critical(self, "Error", f"No write permissions in {device.log_dir}") return False # Generate filename with device serial and timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") clean_serial = device.serial.replace(":", "_")[-8:] # Clean for filename filename = os.path.join( device.log_dir, f"battery_test_{clean_serial}_{timestamp}.csv" ) # Open file and initialize writer device.log_file = open(filename, 'w', newline='') device.log_writer = csv.writer(device.log_file) # Write comprehensive header device.log_file.write(f"# ADALM1000 Battery Test Log\n") device.log_file.write(f"# Device Serial: {device.serial}\n") device.log_file.write(f"# Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") device.log_file.write(f"# Test Mode: {self.current_mode}\n") if hasattr(self, 'capacity_input'): device.log_file.write(f"# Battery Capacity: {self.capacity_input.text()} Ah\n") # Mode-specific parameters if self.current_mode == "Cycle Test": device.log_file.write(f"# Charge Cutoff: {self.charge_cutoff_input.text()} V\n") device.log_file.write(f"# Discharge Cutoff: {self.discharge_cutoff_input.text()} V\n") device.log_file.write(f"# Rest Time: {self.rest_time_input.text()} h\n") device.log_file.write("#\n") # End of header # Write column headers if self.current_mode == "Cycle Test": device.log_writer.writerow([ "Time(s)", "Voltage(V)", "Current(A)", "Phase", "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", "Coulomb_Eff(%)", "Cycle" ]) else: device.log_writer.writerow([ "Time(s)", "Voltage(V)", "Current(A)", "Phase", "Capacity(Ah)", "Power(W)", "Energy(Wh)" ]) device.is_recording = True device._last_log_time = 0 return True except Exception as e: error_msg = f"Failed to start recording for {device.serial}:\n{str(e)}" print(error_msg) QMessageBox.critical(self, "Recording Error", error_msg) if device.log_file: try: device.log_file.close() except: pass device.log_file = None device.log_writer = None return False def finalize_device_log_file(self, device): """Properly finalize and close a device's log file""" if not device.is_recording or not device.log_file: return try: # Write test summary footer device.log_file.write("\n# TEST SUMMARY\n") device.log_file.write(f"# End Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") if device.time_data: duration = device.time_data[-1] if device.time_data else 0 device.log_file.write(f"# Duration: {self.format_time(duration)}\n") device.log_file.write(f"# Final Voltage: {device.voltage_data[-1]:.4f} V\n") device.log_file.write(f"# Final Current: {device.current_data[-1]:.4f} A\n") device.log_file.write(f"# Total Capacity: {device.capacity_ah:.6f} Ah\n") device.log_file.write(f"# Total Energy: {device.energy:.6f} Wh\n") if self.current_mode == "Cycle Test": device.log_file.write(f"# Cycles Completed: {device.cycle_count}\n") if device.charge_capacity > 0: device.log_file.write(f"# Coulomb Efficiency: {device.coulomb_efficiency:.2f}%\n") except Exception as e: print(f"Error writing footer for {device.serial}: {str(e)}") finally: try: device.log_file.close() except: pass device.log_file = None device.log_writer = None device.is_recording = False def format_time(self, seconds): """Convert seconds to hh:mm:ss format""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" @safe_execute def stop_test(self): """Request immediate stop with proper visual feedback""" # Immediate red button feedback self.toggle_button.setStyleSheet(f""" QPushButton {{ background-color: {self.status_colors["warning"]}; color: white; font-weight: bold; }} """) QApplication.processEvents() if not self.test_running: self.reset_button_state() return try: # Stop operations self.request_stop = True self.test_running = False self.measuring = False # Stop workers workers = ['test_sequence_worker', 'discharge_worker', 'charge_worker'] for worker in workers: if hasattr(self, worker): getattr(self, worker).stop() # Reset device if self.active_device: self.active_device.dev.channels['A'].mode = pysmu.Mode.HI_Z self.active_device.dev.channels['A'].constant(0) self.handle_device_connection(True) # Confirm connection except Exception as e: self.handle_device_connection(False, e) finally: # Clean up self.reset_test_data() self.reset_plot() self.reset_button_state() if hasattr(self, 'current_cycle_file'): self.finalize_log_file() self.status_bar.showMessage("Test stopped") def set_connection_status(self, text, color=None): """Update connection status with optional color""" if color is None: if "error" in text.lower(): color = self.status_colors["error"] elif "disconnect" in text.lower(): color = self.status_colors["disconnected"] else: color = self.status_colors["connected"] self.connection_label.setText(text) self.status_light.setStyleSheet(f""" background-color: {color}; border-radius: 10px; min-width: 12px; max-width: 12px; min-height: 12px; max-height: 12px; """) QApplication.processEvents() def reset_button_state(self): """Reset button to appropriate default state""" mode = getattr(self, 'current_mode', 'Live Monitoring') text = { 'Cycle Test': "START CYCLE TEST", 'Discharge Test': "START DISCHARGE", 'Charge Test': "START CHARGE" }.get(mode, "START") self.toggle_button.setChecked(False) self.toggle_button.setText(text) self.apply_button_style() self.toggle_button.setStyleSheet(f""" QPushButton {{ background-color: {self.success_color}; color: {self.fg_color}; font-weight: bold; }} QPushButton:checked {{ background-color: {self.warning_color}; }} QPushButton:disabled {{ background-color: #4C566A; }} """) def reset_button_style(self): """Reset button to default style""" mode = getattr(self, 'current_mode', 'Live Monitoring') text = { 'Cycle Test': "START CYCLE TEST", 'Discharge Test': "START DISCHARGE", 'Charge Test': "START CHARGE", }.get(mode, "START") self.toggle_button.setText(text) self.toggle_button.setStyleSheet(f""" QPushButton {{ background-color: {self.success_color}; color: {self.fg_color}; font-weight: bold; }} QPushButton:checked {{ background-color: {self.warning_color}; }} """) def closeEvent(self, event): """Ensure clean shutdown""" self.stop_test() # Stop all timers self.status_timer.stop() # Close all log files for device in self.devices.values(): if device.is_recording: self.finalize_device_log_file(device) # Clean up threads if hasattr(self, 'measurement_thread') and self.measurement_thread: self.measurement_thread.stop() self.measurement_thread.wait(1000) event.accept() def reset_test_data(self): if not self.active_device: return dev = self.active_device # Clear all data buffers dev.time_data.clear() dev.voltage_data.clear() dev.current_data.clear() # Display-Daten ebenfalls zurücksetzen dev.display_time_data.clear() dev.display_voltage_data.clear() dev.display_current_data.clear() # Reset statistics dev.capacity_ah = 0.0 dev.energy = 0.0 dev.charge_capacity = 0.0 dev.coulomb_efficiency = 0.0 dev.test_phase = "Idle" # Reset UI displays self.voltage_label.setText("0.000") self.current_label.setText("0.000") #self.time_label.setText("00:00:00") self.capacity_label.setText("0.0000") self.energy_label.setText("0.0000") self.phase_label.setText("Idle") def finalize_test(self): """Final cleanup after test completes or is stopped""" try: # 1. Stop any active measurement or test operations self.measuring = False self.test_running = False # 2. Reset device to safe state try: self.active_device.dev.channels['A'].mode = pysmu.Mode.HI_Z self.active_device.dev.channels['A'].constant(0) self.active_device.dev.channels['B'].mode = pysmu.Mode.HI_Z self.active_device.dev.channels['B'].constant(0) except Exception as e: print(f"Error resetting device in finalize: {e}") # 3. Clean up test sequence thread safely if hasattr(self, 'test_sequence_thread'): try: if self.test_sequence_thread.isRunning(): if hasattr(self, 'test_sequence_worker'): try: self.test_sequence_worker.stop() except RuntimeError: pass self.test_sequence_thread.quit() self.test_sequence_thread.wait(500) except Exception as e: print(f"Error stopping test sequence thread: {e}") finally: if hasattr(self, 'test_sequence_worker'): try: if not sip.isdeleted(self.test_sequence_worker): self.test_sequence_worker.deleteLater() except: pass if hasattr(self, 'test_sequence_thread'): try: if not sip.isdeleted(self.test_sequence_thread): self.test_sequence_thread.deleteLater() except: pass finally: if hasattr(self, 'test_sequence_thread'): del self.test_sequence_thread # 4. Clean up discharge thread safely if hasattr(self, 'discharge_thread'): try: if self.discharge_thread.isRunning(): if hasattr(self, 'discharge_worker'): try: self.discharge_worker.stop() except RuntimeError: pass self.discharge_thread.quit() self.discharge_thread.wait(500) except Exception as e: print(f"Error stopping discharge thread: {e}") finally: if hasattr(self, 'discharge_worker'): try: if not sip.isdeleted(self.discharge_worker): self.discharge_worker.deleteLater() except: pass if hasattr(self, 'discharge_thread'): try: if not sip.isdeleted(self.discharge_thread): self.discharge_thread.deleteLater() except: pass finally: if hasattr(self, 'discharge_thread'): del self.discharge_thread # 5. Clean up charge thread safely (using same pattern as discharge thread) if hasattr(self, 'charge_thread'): try: if self.charge_thread.isRunning(): if hasattr(self, 'charge_worker'): try: self.charge_worker.stop() except RuntimeError: pass self.charge_thread.quit() self.charge_thread.wait(500) except Exception as e: print(f"Error stopping charge thread: {e}") finally: if hasattr(self, 'charge_worker'): try: if not sip.isdeleted(self.charge_worker): self.charge_worker.deleteLater() except: pass if hasattr(self, 'charge_thread'): try: if not sip.isdeleted(self.charge_thread): self.charge_thread.deleteLater() except: pass finally: if hasattr(self, 'charge_thread'): del self.charge_thread # 6. Finalize log file self.finalize_log_file() # 7. Reset UI and state self.request_stop = False self.toggle_button.setChecked(False) self.toggle_button.setText("START") self.apply_button_style() self.toggle_button.setEnabled(True) self.apply_button_style() # Add this line self.test_running = False # 8. Show completion message if test wasn't stopped by user if not self.request_stop: test_current = self.c_rate * self.capacity test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A" if self.current_mode == "Cycle Test": message = ( f"Cycle test completed | " f"Cycle {self.cycle_count} | " f"Capacity: {self.capacity_ah:.4f}Ah | " f"Efficiency: {self.coulomb_efficiency:.1f}%" ) QMessageBox.information( self, "Test Completed", f"Cycle test completed successfully.\n\n" f"Test Parameters:\n" f"- Capacity: {self.capacity} Ah\n" f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n" f"- Charge Cutoff: {self.charge_cutoff} V\n" f"- Discharge Cutoff: {self.discharge_cutoff} V\n" f"- Conditions: {test_conditions}\n\n" f"Results:\n" f"- Cycles: {self.cycle_count}\n" f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n" f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%" ) elif self.current_mode == "Discharge Test": message = ( f"Discharge completed | " f"Capacity: {self.capacity_ah:.4f}Ah | " f"Energy: {self.energy:.4f}Wh" ) QMessageBox.information( self, "Discharge Completed", f"Discharge test completed successfully.\n\n" f"Test Parameters:\n" f"- Capacity: {self.capacity} Ah\n" f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n" f"- Discharge Cutoff: {self.discharge_cutoff} V\n" f"- Conditions: {test_conditions}\n\n" f"Results:\n" f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n" f"- Energy delivered: {self.energy:.4f}Wh" ) self.status_bar.showMessage(message) except Exception as e: print(f"Error in finalize_test: {e}") import traceback traceback.print_exc() # Ensure we don't leave the UI in a locked state self.toggle_button.setChecked(False) self.toggle_button.setText("START") self.apply_button_style() self.toggle_button.setEnabled(True) self.status_bar.showMessage("Error during test finalization") def reset_plot(self): """Completely reset the plot to initial state""" # Clear plot data self.line_voltage.set_data([], []) self.line_current.set_data([], []) # Reset axes self.ax.set_xlim(0, 10) self.ax.set_ylim(0, 5.0) # Full voltage range self.ax2.set_ylim(-0.25, 0.25) # Full current range # Redraw with slight delay to ensure UI updates QTimer.singleShot(50, self.canvas.draw_idle) def update_status_and_plot(self): """Combined status and plot update""" self.update_status() self.update_plot() def update_plot(self): """Fixed plot method with safe attribute access""" try: if not self.active_device: return # Create local copies of data safely dev = self.active_device with self.plot_mutex: if not dev.display_time_data: return x_data = list(dev.display_time_data) y1_data = list(dev.display_voltage_data) y2_data = list(dev.display_current_data) # Update plot data self.line_voltage.set_data(x_data, y1_data) self.line_current.set_data(x_data, y2_data) # Auto-scale when needed if len(x_data) > 1: self.auto_scale_axes() # Force redraw self.canvas.draw_idle() except Exception as e: print(f"Plot error: {e}") # Attempt to recover try: self.reset_plot() except: pass def auto_scale_axes(self): """Auto-scale plot axes with appropriate padding and strict boundaries""" if not self.active_device or not self.active_device.time_data: return dev = self.active_device min_time = 0 max_time = dev.time_data[-1] current_xlim = self.ax.get_xlim() if max_time > current_xlim[1] * 0.95: new_max = max_time * 1.05 self.ax.set_xlim(min_time, new_max) self.ax2.set_xlim(min_time, new_max) voltage_padding = 0.2 if dev.voltage_data: min_voltage = max(0, min(dev.voltage_data) - voltage_padding) max_voltage = min(5.0, max(dev.voltage_data) + voltage_padding) current_ylim = self.ax.get_ylim() if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1): self.ax.set_ylim(min_voltage, max_voltage) current_padding = 0.05 if dev.current_data: min_current = max(-0.25, min(dev.current_data) - current_padding) max_current = min(0.25, max(dev.current_data) + current_padding) current_ylim2 = self.ax2.get_ylim() if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02): self.ax2.set_ylim(min_current, max_current) @pyqtSlot(str) def handle_device_error(self, error_msg): """Handle device errors with proper connection status""" self.handle_device_connection(False, f"Error: {error_msg}") self.reconnect_btn.setVisible(True) self.reconnect_btn.setEnabled(True) self.toggle_button.setEnabled(False) def validate_measurements(self, voltage, current): """Filter out invalid measurements""" # Fix negative values caused by connection issues if voltage < 0 or not (0 <= voltage <= 5.0): return False if abs(current) > 0.3: # Beyond ADALM1000's ±200mA range return False return True @pyqtSlot(str) def update_test_phase(self, phase_text): """Update the test phase display""" self.test_phase = phase_text self.phase_label.setText(phase_text) @pyqtSlot(str) def handle_test_error(self, error_msg): """Handle errors from the test sequence with complete cleanup""" try: # 1. Notify user QMessageBox.critical(self, "Test Error", f"An error occurred:\n{error_msg}\n\nAttempting to recover...") # 2. Stop all operations self.stop_test() # 3. Reset UI elements if hasattr(self, 'line_voltage'): try: self.line_voltage.set_data([], []) self.line_current.set_data([], []) self.ax.set_xlim(0, 1) self.ax2.set_xlim(0, 1) self.canvas.draw() except Exception as plot_error: print(f"Plot reset error: {plot_error}") # 4. Update status self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...") self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;") # 5. Attempt recovery QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect except Exception as e: print(f"Error in error handler: {e}") # Fallback - restart application? QMessageBox.critical(self, "Fatal Error", "The application needs to restart due to an unrecoverable error") QTimer.singleShot(1000, self.close) def attempt_reconnect(self): """Attempt to reconnect automatically""" QMessageBox.critical( self, "Device Connection Error", "Could not connect to ADALM1000\n\n" "1. Check USB cable connection\n" "2. The device will attempt to reconnect automatically" ) QTimer.singleShot(1000, self.reconnect_device) def cleanup_test_threads(self): """Clean up any existing test threads before starting a new test""" # Stop and clean up test sequence thread if it exists if hasattr(self, 'test_sequence_thread'): try: if hasattr(self, 'test_sequence_worker'): self.test_sequence_worker.stop() self.test_sequence_thread.quit() self.test_sequence_thread.wait(500) except Exception as e: print(f"Error cleaning up test sequence thread: {e}") finally: if hasattr(self, 'test_sequence_worker'): try: self.test_sequence_worker.deleteLater() except: pass if hasattr(self, 'test_sequence_thread'): try: self.test_sequence_thread.deleteLater() except: pass # Stop and clean up discharge thread if it exists if hasattr(self, 'discharge_thread'): try: if hasattr(self, 'discharge_worker'): self.discharge_worker.stop() self.discharge_thread.quit() self.discharge_thread.wait(500) except Exception as e: print(f"Error cleaning up discharge thread: {e}") finally: if hasattr(self, 'discharge_worker'): try: self.discharge_worker.deleteLater() except: pass if hasattr(self, 'discharge_thread'): try: self.discharge_thread.deleteLater() except: pass # Stop and clean up charge thread if it exists if hasattr(self, 'charge_thread'): try: if hasattr(self, 'charge_worker'): self.charge_worker.stop() self.charge_thread.quit() self.charge_thread.wait(500) except Exception as e: print(f"Error cleaning up charge thread: {e}") finally: if hasattr(self, 'charge_worker'): try: self.charge_worker.deleteLater() except: pass if hasattr(self, 'charge_thread'): try: self.charge_thread.deleteLater() except: pass def reconnect_device(self): """Comprehensive device reconnection handler""" try: self.handle_device_connection(False, "Reconnecting...") # 1. Clean up existing connections if hasattr(self, 'measurement_thread') and self.measurement_thread: try: self.measurement_thread.stop() if not self.measurement_thread.wait(500): self.measurement_thread.terminate() except Exception as e: print(f"Error stopping measurement thread: {e}") if hasattr(self, 'session') and self.session: try: self.session.end() except Exception as e: print(f"Error ending session: {e}") # 2. Initialize new session self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) self.session.scan() if not self.session.devices: self.handle_device_connection(False, "No devices detected") return self.session.start(0) # 3. Re-establish device connection if hasattr(self, 'active_device') and self.active_device: # Try to find the same device by serial target_serial = self.active_device.serial for dev in self.session.devices: if dev.serial == target_serial: # Recreate the DeviceManager self.active_device = DeviceManager(dev) self.devices[target_serial] = self.active_device break else: # No previous device, just use first available dev = self.session.devices[0] self.active_device = DeviceManager(dev) self.devices[dev.serial] = self.active_device # 4. Restart measurement system self.active_device.start_measurement(self.interval) # Reconnect signals self.measurement_thread = self.active_device.measurement_thread self.measurement_thread.update_signal.connect(self.update_measurements) self.measurement_thread.error_signal.connect(self.handle_device_error) # 5. Update UI self.handle_device_connection(True, f"Reconnected: {self.active_device.serial}") self.toggle_button.setEnabled(True) # Update device dropdown self.device_combo.clear() for serial in self.devices: self.device_combo.addItem(serial) self.device_combo.setCurrentText(self.active_device.serial) except Exception as e: self.handle_device_connection(False, f"Reconnect failed: {str(e)}") if __name__ == "__main__": app = QApplication([]) try: window = BatteryTester() window.show() app.exec_() except Exception as e: import traceback traceback.print_exc() QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}")