# -*- coding: utf-8 -*- import os import time import csv from datetime import datetime import numpy as np # Suppress warnings os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false' os.environ['LIBUSB_DEBUG'] = '0' from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QPushButton, QLineEdit, QFrame, QCheckBox, QMessageBox, QFileDialog, QProgressBar) from PyQt5.QtCore import (Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread, QMutex, QMutexLocker) from PyQt5.QtGui import QColor, QPalette from collections import deque import pysmu import matplotlib matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure class DeviceDisconnectedError(Exception): """Custom exception for device disconnection events.""" pass class MeasurementThread(QThread): """Thread for continuous measurement of voltage and current.""" update_signal = pyqtSignal(float, float, float) # voltage, current, timestamp error_signal = pyqtSignal(str) status_signal = pyqtSignal(str) def __init__(self, device: object, interval: float = 0.1, start_time: float = None): """Initialize measurement thread.""" super().__init__() self._mutex = QMutex() self.data_mutex = QMutex() self.device = device self.interval = max(0.05, interval) self._running = False self.filter_window_size = 10 self.start_time = start_time if start_time else time.time() self.last_update_time = self.start_time # Configure channels self.device.channels['A'].mode = pysmu.Mode.SIMV # Channel A for current self.device.channels['B'].mode = pysmu.Mode.HI_Z # Channel B for voltage def run(self): """Measurement loop for both voltage and current.""" self._running = True voltage_window = deque() current_window = deque() self.status_signal.emit("Measurement started") self.last_update_time = time.time() while self._running: try: samples = self.device.read(self.filter_window_size, timeout=500) if not samples or len(samples) < self.filter_window_size: continue with QMutexLocker(self._mutex): # Get voltage from Channel B and current from Channel A raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage raw_current = np.mean([s[0][1] for s in samples]) # Channel A current # Apply moving average filter if len(voltage_window) >= self.filter_window_size: voltage_window.popleft() voltage_window.append(raw_voltage) if len(current_window) >= self.filter_window_size: current_window.popleft() current_window.append(raw_current) voltage = np.mean(voltage_window) current = np.mean(current_window) current_time = time.time() - self.start_time self.update_signal.emit(voltage, current, current_time) elapsed = time.time() - self.last_update_time sleep_time = max(0.01, self.interval - elapsed) time.sleep(sleep_time) self.last_update_time = time.time() except Exception as e: self.error_signal.emit(f"Measurement error: {str(e)}") break self.status_signal.emit("Measurement stopped") self._running = False def stop(self): """Safe thread termination with timeout.""" self._running = False if self.isRunning(): self.quit() if not self.wait(300): # 300ms grace period self.terminate() def is_active(self) -> bool: """Check if thread is running and updating.""" with QMutexLocker(self._mutex): return self._running and (time.time() - self.last_update_time < 2.0) class TestSequenceThread(QThread): """Thread for executing battery test sequences.""" progress_updated = pyqtSignal(float, str) # progress, phase cycle_completed = pyqtSignal(int, float, float, float) # cycle, discharge, charge, efficiency error_occurred = pyqtSignal(str) def __init__(self, parent: QObject): """Initialize test sequence thread.""" super().__init__(parent) # Pass parent to QThread self.parent = parent self._mutex = QMutex() # For thread operation control self.data_mutex = QMutex() # For data protection self._running = False def run(self): """Execute the complete test sequence with configurable modes.""" self._running = True try: test_current = float(self.parent.c_rate_input.text()) * float(self.parent.capacity_input.text()) charge_cutoff = float(self.parent.charge_cutoff_input.text()) discharge_cutoff = float(self.parent.discharge_cutoff_input.text()) cv_cutoff_current = float(self.parent.cv_cutoff_input.text()) # Add this input field while self._running and (self.parent.continuous_mode or self.parent.cycle_count == 0): with QMutexLocker(self._mutex): if self.parent.request_stop: break self.parent.cycle_count += 1 cycle = self.parent.cycle_count # CC Charge phase self._execute_phase("charge", test_current, charge_cutoff, discharge_cutoff, charge_cutoff) if not self._running: break # Optional CV Charge phase if self.parent.cv_mode_enabled: # Add checkbox in UI self._execute_cv_charge(charge_cutoff, test_current, cv_cutoff_current) if not self._running: break # Rest after charge self._execute_rest("post-charge") if not self._running: break # Discharge phase (CC) self._execute_phase("discharge", test_current, discharge_cutoff, discharge_cutoff, charge_cutoff) if not self.parent.continuous_mode: break # Rest after discharge if self._running: self._execute_rest("post-discharge") # Calculate efficiency if self.parent.charge_capacity > 0: efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100 self.cycle_completed.emit(cycle, self.parent.capacity_ah, self.parent.charge_capacity, efficiency) except Exception as e: self.error_occurred.emit(str(e)) finally: self._running = False def _execute_cv_charge(self, target_voltage: float, current_limit: float, cutoff_current: float): """Execute constant voltage charge phase.""" try: self.progress_updated.emit(0.0, "CV Charge") # Configure for CV mode self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV self.parent.dev.channels['A'].constant(target_voltage) start_time = time.time() last_update = start_time while self._running: with QMutexLocker(self._mutex): if self.parent.request_stop: break if not self.parent.test_data['voltage']: time.sleep(0.1) continue current_voltage = self.parent.test_data['voltage'][-1] current_current = abs(self.parent.test_data['current'][-1]) current_time = time.time() delta_t = current_time - last_update last_update = current_time # Update charge capacity self.parent.charge_capacity += current_current * delta_t / 3600 # Calculate progress based on current progress = 1 - (current_current / current_limit) progress = max(0.0, min(1.0, progress)) self.progress_updated.emit(progress, "CV Charge") # Check termination condition if current_current <= cutoff_current: break time.sleep(0.1) except Exception as e: self.error_occurred.emit(f"CV Charge error: {str(e)}") raise finally: self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z def _execute_phase(self, phase: str, current: float, target_voltage: float, discharge_cutoff: float, charge_cutoff: float): try: print(f"\n=== Starting {phase} phase ===") print(f"Device session active: {self.parent.session_active}") print(f"Channel A mode before: {self.parent.dev.channels['A'].mode}") # Reset channel first with longer delay self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z self.parent.dev.channels['A'].constant(0) time.sleep(1.0) # Increased settling time # Configure for current mode with explicit setup self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV time.sleep(0.5) # Additional delay for mode change # Set current with proper polarity if phase == "charge": print(f"Starting CHARGE at {current}A to {target_voltage}V") self.parent.dev.channels['A'].constant(current) else: print(f"Starting DISCHARGE at {-current}A to {target_voltage}V") self.parent.dev.channels['A'].constant(-current) time.sleep(1.0) # Allow more settling time print(f"Set constant current to: {current} A on channel A") # Verify current setting with more samples samples = self.parent.dev.read(50, timeout=2000) # More samples, longer timeout measured_current = np.mean([s[0][1] for s in samples]) print(f"Requested {current}A, Measured {measured_current:.6f}A") # Additional debug info print(f"Channel A mode: {self.parent.dev.channels['A'].mode}") if abs(measured_current) < 0.001: # If current is still near zero print("Warning: Current not being applied - checking connection") # Try resetting the device session self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z time.sleep(0.5) self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV time.sleep(0.5) self.parent.dev.channels['A'].constant(current if phase == "charge" else -current) time.sleep(1.0) # Re-measure samples = self.parent.dev.read(50, timeout=2000) measured_current = np.mean([s[0][1] for s in samples]) print(f"After reset: Requested {current}A, Measured {measured_current:.6f}A") if abs(measured_current) < 0.001: # Still no current raise RuntimeError(f"Failed to apply {current}A - check device connection and battery") # Main phase loop start_time = time.time() last_update = start_time while self._running: with QMutexLocker(self._mutex): if self.parent.request_stop: break if not self.parent.test_data['voltage']: time.sleep(0.1) continue current_voltage = self.parent.test_data['voltage'][-1] current_time = time.time() delta_t = current_time - last_update last_update = current_time # Update capacity if phase == "charge": self.parent.charge_capacity += abs(self.parent.test_data['current'][-1]) * delta_t / 3600 progress = (current_voltage - discharge_cutoff) / (target_voltage - discharge_cutoff) else: self.parent.capacity_ah += abs(self.parent.test_data['current'][-1]) * delta_t / 3600 progress = (charge_cutoff - current_voltage) / (charge_cutoff - target_voltage) progress = max(0.0, min(1.0, progress)) self.progress_updated.emit(progress, f"{phase.capitalize()}ing") # Check termination conditions if ((phase == "charge" and current_voltage >= target_voltage) or (phase == "discharge" and current_voltage <= target_voltage)): break time.sleep(0.1) except AttributeError as e: if '_constant' in str(e): print("Warning: Internal attribute check failed, but current setting should still work") # Continue with the test else: self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}") raise except Exception as e: self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}") raise finally: # Ensure channel is reset even if error occurs try: self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z self.parent.dev.channels['A'].constant(0) except: pass # Best effort cleanup class BatteryTester(QMainWindow): """Main application window for battery capacity testing.""" error_signal = pyqtSignal(str) def __init__(self): """Initialize the battery tester application with enhanced features.""" super().__init__() self.error_signal.connect(self.handle_device_error) # Initialize thread-safety objects self._mutex = QMutex() # For general thread safety self.data_mutex = QMutex() # For test data protection # Enhanced data structure for flexible mode support self.test_data = { 'time': deque(), 'voltage': deque(), 'current': deque(), 'mode': deque(), # Tracks operation mode (CC, CV, etc.) 'phase': deque(), # Tracks charge/discharge/rest 'capacity': deque(), 'energy': deque() } # Test control variables self.test_phase = "Ready" self.current_mode = "None" # Tracks current operation mode self.capacity_ah = 0.0 # Discharge capacity self.charge_capacity = 0.0 # Charge capacity self.energy_wh = 0.0 # Energy in watt-hours self.coulomb_efficiency = 0.0 self.cycle_count = 0 self.cycle_data = [] # Stores cycle statistics # Color scheme self.bg_color = QColor(46, 52, 64) self.fg_color = QColor(216, 222, 233) self.accent_color = QColor(94, 129, 172) self.warning_color = QColor(191, 97, 106) self.success_color = QColor(163, 190, 140) self.cv_color = QColor(143, 188, 187) # Additional color for CV mode # Device and test status self.session_active = False self.measuring = False self.test_running = False self.continuous_mode = False self.cv_mode_enabled = False # CV charge mode flag self.request_stop = False self.interval = 0.1 # Measurement interval # Logging configuration self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) self.start_time = time.time() self.log_buffer = [] self.current_cycle_file = None # Thread management self.measurement_thread = None self.test_thread = None # Initialize UI with all controls self._setup_ui() # Initialize device with delay to avoid USB issues QTimer.singleShot(100, self.safe_init_device) def _setup_ui(self): """Configure the user interface.""" self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)") self.resize(1000, 800) self.setMinimumSize(800, 700) # Set color palette palette = self.palette() palette.setColor(QPalette.Window, self.bg_color) palette.setColor(QPalette.WindowText, self.fg_color) palette.setColor(QPalette.Base, QColor(59, 66, 82)) palette.setColor(QPalette.Text, self.fg_color) palette.setColor(QPalette.Button, self.accent_color) palette.setColor(QPalette.ButtonText, self.fg_color) self.setPalette(palette) # Main widget and layout self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.main_layout = QVBoxLayout(self.central_widget) self.main_layout.setContentsMargins(10, 10, 10, 10) self.main_layout.setSpacing(10) # Header section header_frame = QWidget() header_layout = QHBoxLayout(header_frame) header_layout.setContentsMargins(0, 0, 0, 0) self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)") self.title_label.setStyleSheet(f"font-size: 16px; font-weight: bold; color: {self.accent_color.name()};") header_layout.addWidget(self.title_label, 1) # Connection indicator self.connection_label = QLabel("Disconnected") header_layout.addWidget(self.connection_label) self.status_light = QLabel() self.status_light.setFixedSize(20, 20) self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") header_layout.addWidget(self.status_light) # Reconnect button self.reconnect_btn = QPushButton("Connect") self.reconnect_btn.clicked.connect(self.reconnect_device) header_layout.addWidget(self.reconnect_btn) self.main_layout.addWidget(header_frame) # Measurement display display_frame = QFrame() display_frame.setFrameShape(QFrame.StyledPanel) display_layout = QGridLayout(display_frame) display_layout.setHorizontalSpacing(15) display_layout.setVerticalSpacing(8) measurement_labels = [ ("Voltage", "V"), ("Current", "A"), ("Test Phase", ""), ("Elapsed Time", "s"), ("Discharge Capacity", "Ah"), ("Charge Capacity", "Ah"), ("Coulomb Efficiency", "%"), ("Cycle Count", ""), ] self.value_labels = {} # Optional: Store labels for easy update for i, (label, unit) in enumerate(measurement_labels): row = i // 2 col = (i % 2) * 3 # Each block is 3 columns: label | value | unit name_label = QLabel(label + ":") name_label.setStyleSheet("font-size: 11px;") display_layout.addWidget(name_label, row, col) value_label = QLabel("0.000") value_label.setStyleSheet("font-size: 12px; font-weight: bold; min-width: 60px;") display_layout.addWidget(value_label, row, col + 1) unit_label = QLabel(unit) unit_label.setStyleSheet("font-size: 11px; color: gray;") display_layout.addWidget(unit_label, row, col + 2) # Save reference for updating later self.value_labels[label] = value_label # Assign to instance attributes for specific fields if label == "Voltage": self.voltage_label = value_label elif label == "Current": self.current_label = value_label elif label == "Test Phase": self.phase_label = value_label elif label == "Elapsed Time": self.time_label = value_label elif label == "Discharge Capacity": self.capacity_label = value_label elif label == "Charge Capacity": self.charge_capacity_label = value_label elif label == "Coulomb Efficiency": self.efficiency_label = value_label elif label == "Cycle Count": self.cycle_label = value_label self.main_layout.addWidget(display_frame) # Progress bar self.progress_bar = QProgressBar() self.progress_bar.setRange(0, 100) self.progress_bar.setTextVisible(False) self.progress_bar.setStyleSheet(f""" QProgressBar {{ border: 1px solid {self.fg_color.name()}; border-radius: 5px; text-align: center; }} QProgressBar::chunk {{ background-color: {self.accent_color.name()}; }} """) self.main_layout.addWidget(self.progress_bar) # Control section controls_frame = QWidget() controls_layout = QHBoxLayout(controls_frame) controls_layout.setContentsMargins(0, 0, 0, 0) # Parameters frame params_frame = QFrame() params_frame.setFrameShape(QFrame.StyledPanel) params_layout = QGridLayout(params_frame) # Add CV cutoff current input params_layout.addWidget(QLabel("CV Cutoff Current (A):"), 4, 0) self.cv_cutoff_input = QLineEdit("0.02") # 20mA default self.cv_cutoff_input.setFixedWidth(80) self.cv_cutoff_input.setToolTip("Current at which CV charging should stop") params_layout.addWidget(self.cv_cutoff_input, 4, 1) # Battery capacity params_layout.addWidget(QLabel("Battery Capacity (Ah):"), 0, 0) self.capacity_input = QLineEdit("0.2") self.capacity_input.setFixedWidth(80) self.capacity_input.setToolTip("Nominal capacity of the battery in Amp-hours") params_layout.addWidget(self.capacity_input, 0, 1) # Charge cutoff voltage params_layout.addWidget(QLabel("Charge Cutoff (V):"), 1, 0) self.charge_cutoff_input = QLineEdit("1.43") self.charge_cutoff_input.setFixedWidth(80) self.charge_cutoff_input.setToolTip("Voltage at which charging should stop") params_layout.addWidget(self.charge_cutoff_input, 1, 1) # Discharge cutoff voltage params_layout.addWidget(QLabel("Discharge Cutoff (V):"), 2, 0) self.discharge_cutoff_input = QLineEdit("0.9") self.discharge_cutoff_input.setFixedWidth(80) self.discharge_cutoff_input.setToolTip("Voltage at which discharging should stop") params_layout.addWidget(self.discharge_cutoff_input, 2, 1) # Rest time params_layout.addWidget(QLabel("Rest Time (hours):"), 3, 0) self.rest_time_input = QLineEdit("0.25") self.rest_time_input.setFixedWidth(80) self.rest_time_input.setToolTip("Rest period between charge/discharge cycles") params_layout.addWidget(self.rest_time_input, 3, 1) # C-Rate for test params_layout.addWidget(QLabel("Test C-Rate:"), 0, 2) self.c_rate_input = QLineEdit("0.1") self.c_rate_input.setFixedWidth(60) self.c_rate_input.setToolTip("Charge/discharge rate relative to battery capacity (e.g., 0.2 for C/5)") params_layout.addWidget(self.c_rate_input, 0, 3) params_layout.addWidget(QLabel("(e.g., 0.2 for C/5)"), 0, 4) controls_layout.addWidget(params_frame, 1) # Button area button_frame = QWidget() button_layout = QVBoxLayout(button_frame) button_layout.setContentsMargins(0, 0, 0, 0) # Add CV mode checkbox and cutoff current input self.cv_mode_check = QCheckBox("Enable CV Charge") self.cv_mode_check.setChecked(False) self.cv_mode_check.setToolTip("Enable constant voltage charge phase after CC charge") button_layout.addWidget(self.cv_mode_check) self.start_button = QPushButton("START TEST") self.start_button.clicked.connect(self.start_test) self.start_button.setStyleSheet(f""" QPushButton {{ background-color: {self.accent_color.name()}; font-weight: bold; padding: 8px; border-radius: 5px; }} QPushButton:disabled {{ background-color: #4C566A; }} """) self.start_button.setEnabled(False) button_layout.addWidget(self.start_button) self.stop_button = QPushButton("STOP TEST") self.stop_button.clicked.connect(self.stop_test) self.stop_button.setStyleSheet(f""" QPushButton {{ background-color: {self.warning_color.name()}; font-weight: bold; padding: 8px; border-radius: 5px; }} QPushButton:disabled {{ background-color: #4C566A; }} """) self.stop_button.setEnabled(False) button_layout.addWidget(self.stop_button) # Continuous mode checkbox self.continuous_check = QCheckBox("Continuous Mode") self.continuous_check.setChecked(True) self.continuous_check.setToolTip("Run multiple charge/discharge cycles until stopped") button_layout.addWidget(self.continuous_check) button_layout.addWidget(self.cv_mode_check) controls_layout.addWidget(button_frame) self.main_layout.addWidget(controls_frame) # Plot area self._setup_plot() self.main_layout.addWidget(self.plot_widget, 1) # Status bar self.status_bar = QLabel("Ready") self.status_bar.setStyleSheet("font-size: 10px; padding: 5px;") self.main_layout.addWidget(self.status_bar) def _setup_plot(self): """Configure the matplotlib plot.""" self.plot_widget = QWidget() plot_layout = QVBoxLayout(self.plot_widget) self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440') self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) self.ax = self.fig.add_subplot(111) self.ax.set_facecolor('#3B4252') # Initial voltage range voltage_padding = 0.2 min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) # Voltage plot self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) self.ax.set_ylabel("Voltage (V)", color='#00BFFF') self.ax.tick_params(axis='y', labelcolor='#00BFFF') # Current plot (right axis) self.ax2 = self.ax.twinx() current_padding = 0.05 test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) max_current = test_current * 1.5 self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) self.ax2.set_ylabel("Current (A)", color='r') self.ax2.tick_params(axis='y', labelcolor='r') self.ax.set_xlabel('Time (s)', color=self.fg_color.name()) self.ax.set_title('Battery Test (CC)', color=self.fg_color.name()) self.ax.tick_params(axis='x', colors=self.fg_color.name()) self.ax.grid(True, color='#4C566A') # Position legends self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) # Embed plot self.canvas = FigureCanvas(self.fig) plot_layout.addWidget(self.canvas) def safe_init_device(self): """Safe device initialization with error handling.""" try: self.init_device() except Exception as e: self.handle_device_error(str(e)) def init_device(self): """Initialize the ADALM1000 device.""" # Temporarily enable USB debugging os.environ['LIBUSB_DEBUG'] = '3' # Set to 0 in production self.cleanup_device() try: print("Waiting before initializing session...") time.sleep(1.5) # Delay helps avoid "device busy" issues self.session = pysmu.Session() # 🔍 Log detected devices print(f"Devices found: {self.session.devices}") if not self.session.devices: raise Exception("No ADALM1000 detected - check USB connection") self.dev = self.session.devices[0] # Reset channels self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].constant(0) self.session.start(0) # Update UI self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") self.connection_label.setText("Connected") self.status_bar.setText("Device connected | Ready for measurement") self.session_active = True self.start_button.setEnabled(True) # Start measurement thread self.start_measurement_thread() except Exception as e: raise Exception(f"Device initialization failed: {str(e)}") def cleanup_device(self): """Clean up device resources efficiently with essential safeguards.""" print("Cleaning up device session...") # Stop threads with timeout for thread in [self.measurement_thread, getattr(self, 'test_thread', None)]: if thread and thread.isRunning(): try: thread.stop() if not thread.wait(800): # Reduced timeout thread.terminate() print(f"Warning: {thread.__class__.__name__} required termination") except Exception as e: print(f"Error stopping {thread.__class__.__name__}: {str(e)}") # Clean up session if it exists if getattr(self, 'session', None): try: if self.session_active: # Quick channel reset if device exists if hasattr(self, 'dev'): try: self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z except: pass # Best effort cleanup self.session.end() del self.session except Exception as e: print(f"Session cleanup error: {str(e)}") finally: self.session_active = False # Reset all states and UI self.measuring = False self.test_running = False self.request_stop = True self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") self.connection_label.setText("Disconnected") self.start_button.setEnabled(False) self.stop_button.setEnabled(False) def start_measurement_thread(self): """Start the measurement thread.""" if not hasattr(self, 'dev') or self.dev is None: print("Device not initialized, cannot start measurement thread") return if self.measurement_thread is not None: self.measurement_thread.stop() self.measurement_thread.wait(500) self.measurement_thread = MeasurementThread( device=self.dev, interval=self.interval, start_time=self.start_time ) self.measurement_thread.update_signal.connect(self.update_measurements) self.measurement_thread.error_signal.connect(self.handle_device_error) self.measurement_thread.start() def reconnect_device(self): """Attempt to reconnect the device.""" self.status_bar.setText("Attempting to reconnect...") self.cleanup_device() QTimer.singleShot(2000, self.safe_init_device) # Retry with delay def handle_device_error(self, error_msg): """Thread-safe device error handling.""" try: # Ensure this runs in the main thread if QThread.currentThread() != self.thread(): self.error_signal.emit(str(error_msg)) return print(f"Device error: {error_msg}") def update_ui(): self.status_bar.setText(f"Device error: {error_msg}") if self.isVisible(): QMessageBox.critical( self, "Device Error", f"Device error occurred:\n{error_msg}\n\n" "1. Check USB connection\n" "2. Try manual reconnect\n" "3. Restart application if problems persist" ) QTimer.singleShot(0, lambda: ( self.cleanup_device(), update_ui() )) except Exception as e: print(f"Error in error handler: {str(e)}") try: self.cleanup_device() except: pass def start_test(self): """Start the complete battery test cycle with proper file initialization.""" # Check if test is already running if self.test_running: return # Verify thread safety objects exist if not hasattr(self, 'data_mutex') or self.data_mutex is None: QMessageBox.critical(self, "Error", "Thread safety not initialized") return try: with QMutexLocker(self.data_mutex): # Get and validate input values with error handling try: capacity = float(self.capacity_input.text()) charge_cutoff = float(self.charge_cutoff_input.text()) discharge_cutoff = float(self.discharge_cutoff_input.text()) c_rate = float(self.c_rate_input.text()) cv_cutoff = float(self.cv_cutoff_input.text()) if capacity <= 0: raise ValueError("Battery capacity must be positive") if charge_cutoff <= discharge_cutoff: raise ValueError("Charge cutoff must be higher than discharge cutoff") if c_rate <= 0: raise ValueError("C-rate must be positive") test_current = c_rate * capacity if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") except ValueError as e: QMessageBox.critical(self, "Input Error", str(e)) return # Reset test state and data structures try: for key in self.test_data: self.test_data[key].clear() self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 except Exception as e: QMessageBox.critical(self, "Data Error", f"Couldn't reset test data: {str(e)}") return # Initialize timing self.start_time = time.time() self.time_label.setText("00:00:00") self.reset_plot() # Initialize log file with error handling try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.filename = os.path.join(self.log_dir, f"battery_test_{timestamp}.csv") with open(self.filename, 'w', newline='') as f: writer = csv.writer(f) writer.writerow([ 'Time(s)', 'Voltage(V)', 'Current(A)', 'Mode', 'Phase', 'Discharge(Ah)', 'Charge(Ah)', 'Efficiency(%)', 'Cycle' ]) self.current_cycle_file = open(self.filename, 'a', newline='') self.log_writer = csv.writer(self.current_cycle_file) self.log_buffer = [] except Exception as e: QMessageBox.critical( self, "File Error", f"Failed to initialize log file:\n{str(e)}\n\n" f"Check directory permissions:\n{self.log_dir}" ) return # Start test with thread safety try: self.test_running = True self.request_stop = False self.test_phase = "Initializing" self.current_mode = "CC" self.continuous_mode = self.continuous_check.isChecked() self.cv_mode_enabled = self.cv_mode_check.isChecked() # UI updates self.start_button.setEnabled(False) self.stop_button.setEnabled(True) self.status_bar.setText( f"Test started | Target: {discharge_cutoff}V @ {test_current:.3f}A | " f"CV Mode: {'ON' if self.cv_mode_enabled else 'OFF'}" ) # Start test thread with cleanup guard if self.test_thread and self.test_thread.isRunning(): self.test_thread.stop() self.test_thread = TestSequenceThread(self) self.test_thread.progress_updated.connect(self.update_test_progress) self.test_thread.cycle_completed.connect(self.update_cycle_stats) self.test_thread.error_occurred.connect(self.handle_device_error) self.test_thread.finished.connect(self.finalize_test) self.test_thread.start() except Exception as e: self.finalize_test(show_message=False) QMessageBox.critical( self, "Test Error", f"Failed to start test thread:\n{str(e)}" ) except Exception as e: self.finalize_test(show_message=False) QMessageBox.critical( self, "Critical Error", f"Unexpected error in test initialization:\n{str(e)}" ) def update_test_progress(self, progress: float, phase: str): """Update test progress and phase display.""" self.test_phase = phase self.phase_label.setText(phase) self.progress_bar.setValue(int(progress * 100)) def update_cycle_stats(self, cycle: int, discharge: float, charge: float, efficiency: float): """Update cycle statistics.""" self.cycle_count = cycle self.capacity_ah = discharge self.charge_capacity = charge self.coulomb_efficiency = efficiency self.cycle_label.setText(f"{cycle}") self.capacity_label.setText(f"{discharge:.4f}") self.charge_capacity_label.setText(f"{charge:.4f}") self.efficiency_label.setText(f"{efficiency:.1f}") self.status_bar.setText( f"Cycle {cycle} completed | " f"Discharge: {discharge:.3f}Ah | " f"Charge: {charge:.3f}Ah | " f"Efficiency: {efficiency:.1f}%" ) # Write cycle summary self.write_cycle_summary() def stop_test(self): """Safely stop the running test.""" if not self.test_running: return self.request_stop = True self.test_running = False self.measuring = False self.test_phase = "Ready" self.phase_label.setText(self.test_phase) if hasattr(self, 'dev'): try: self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) except Exception as e: print(f"Error resetting device: {e}") # Update UI self.status_bar.setText("Test stopped - Ready for new test") self.stop_button.setEnabled(False) self.start_button.setEnabled(True) self.finalize_test(show_message=False) def finalize_test(self, show_message: bool = True): """Final cleanup after test completion with robust error handling.""" # Set up error tracking errors = [] try: # 1. Handle log buffer writing if getattr(self, 'log_writer', None) is not None: try: if getattr(self, 'log_buffer', None): try: self.log_writer.writerows(self.log_buffer) self.log_buffer.clear() except Exception as e: errors.append(f"Failed to write log buffer: {str(e)}") except Exception as e: errors.append(f"Log buffer access error: {str(e)}") # 2. Handle file closure if getattr(self, 'current_cycle_file', None) is not None: try: file = self.current_cycle_file if not file.closed: try: file.flush() os.fsync(file.fileno()) except Exception as e: errors.append(f"Failed to sync file: {str(e)}") finally: file.close() except Exception as e: errors.append(f"File closure error: {str(e)}") finally: if hasattr(self, 'current_cycle_file'): del self.current_cycle_file if hasattr(self, 'log_writer'): del self.log_writer # 3. Clean up thread references try: if hasattr(self, 'test_thread') and self.test_thread is not None: if self.test_thread.isRunning(): self.test_thread.stop() if not self.test_thread.wait(500): # 500ms timeout errors.append("Test thread didn't stop cleanly") self.test_thread = None except Exception as e: errors.append(f"Thread cleanup error: {str(e)}") # 4. Reset test state try: self.test_running = False self.request_stop = True self.measuring = False self.stop_button.setEnabled(False) self.start_button.setEnabled(True) except Exception as e: errors.append(f"State reset error: {str(e)}") # 5. Show completion message if requested if show_message and self.isVisible(): try: msg = "Test completed" if errors: msg += f"\n\nMinor issues occurred:\n- " + "\n- ".join(errors) QMessageBox.information( self, "Test Complete", f"{msg}\nCycles: {self.cycle_count}", QMessageBox.Ok ) except Exception as e: print(f"Failed to show completion message: {str(e)}") except Exception as e: # Catastrophic error handling print(f"Critical error in finalize_test: {str(e)}") try: if hasattr(self, 'current_cycle_file'): try: self.current_cycle_file.close() except: pass except: pass # Ensure basic state is reset self.test_running = False self.request_stop = True self.measuring = False finally: # Final cleanup guarantees try: if hasattr(self, 'log_buffer'): self.log_buffer.clear() except: pass def update_measurements(self, voltage: float, current: float, current_time: float): """Update measurements with enhanced data structure.""" if not hasattr(self, '_mutex'): print("Critical Error: Mutex not initialized") return try: with QMutexLocker(self._mutex): # Limit data points max_points = 10000 for key in self.test_data: if len(self.test_data[key]) > max_points: self.test_data[key].popleft() # Store data self.test_data['time'].append(current_time) self.test_data['voltage'].append(voltage) self.test_data['current'].append(current) self.test_data['phase'].append(self.test_phase) self.test_data['mode'].append(self.current_mode) # Update UI self.voltage_label.setText(f"{voltage:.4f}") self.current_label.setText(f"{current:.4f}") self.time_label.setText(self.format_time(current_time)) # Update plot periodically if len(self.test_data['time']) % 10 == 0: self.update_plot() # Log data if test is running if self.test_running and hasattr(self, 'log_writer'): self.log_buffer.append([ f"{current_time:.3f}", f"{voltage:.6f}", f"{current:.6f}", self.test_phase, f"{self.capacity_ah:.4f}", f"{self.charge_capacity:.4f}", f"{self.coulomb_efficiency:.1f}", f"{self.cycle_count}" ]) # Write data in blocks if len(self.log_buffer) >= 10: try: self.log_writer.writerows(self.log_buffer) if hasattr(self, 'current_cycle_file') and self.current_cycle_file: self.current_cycle_file.flush() self.log_buffer.clear() except Exception as e: print(f"Log write error: {e}") except Exception as e: print(f"Error in update_measurements: {str(e)}") def write_cycle_summary(self): """Write cycle summary to the current cycle's log file""" if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file: return summary_line = ( f"Cycle {self.cycle_count.get()} Summary - " f"Discharge={self.capacity_ah.get():.4f}Ah, " f"Charge={self.charge_capacity.get():.4f}Ah, " f"Efficiency={self.coulomb_efficiency.get():.1f}%" ) # Ensure file is open and write summary try: if self.log_buffer: self.log_writer.writerows(self.log_buffer) self.log_buffer.clear() self.current_cycle_file.write(summary_line + "\n") self.current_cycle_file.flush() except Exception as e: print(f"Error writing cycle summary: {e}") def update_plot(self): """Update the plot with new data using test_data structure.""" if not self.test_data['time']: return self.line_voltage.set_data( list(self.test_data['time']), list(self.test_data['voltage']) ) self.line_current.set_data( list(self.test_data['time']), list(self.test_data['current']) ) self.auto_scale_axes() self.canvas.draw_idle() def auto_scale_axes(self): """Automatically adjust plot axes using test_data.""" if not self.test_data['time']: return # X-axis adjustment max_time = list(self.test_data['time'])[-1] current_xlim = self.ax.get_xlim() if max_time > current_xlim[1] * 0.95: new_xmax = max_time * 1.10 # 10% padding self.ax.set_xlim(0, new_xmax) self.ax2.set_xlim(0, new_xmax) # Y-axes adjustment if self.test_data['voltage']: voltage_padding = 0.2 min_v = max(0, min(list(self.test_data['voltage'])) - voltage_padding) max_v = min(5.0, max(list(self.test_data['voltage'])) + voltage_padding) self.ax.set_ylim(min_v, max_v) if self.test_data['current']: current_padding = 0.05 min_c = max(-0.25, min(list(self.test_data['current'])) - current_padding) max_c = min(0.25, max(list(self.test_data['current'])) + current_padding) self.ax2.set_ylim(min_c, max_c) @staticmethod def format_time(seconds: float) -> str: """Format seconds as HH:MM:SS. Args: seconds: Time in seconds Returns: Formatted time string """ hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def reset_plot(self): """Reset the plot to initial state.""" self.line_voltage.set_data([], []) self.line_current.set_data([], []) # Reset axes to starting values self.ax.set_xlim(0, 10) voltage_padding = 0.2 min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) self.canvas.draw() def closeEvent(self, event): """Clean up when closing the window.""" self.cleanup_device() event.accept() if __name__ == "__main__": app = QApplication([]) app.setStyle('Fusion') window = BatteryTester() window.show() app.exec_()