# -*- coding: utf-8 -*- import os import time import csv from datetime import datetime import numpy as np # Suppress warnings os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false' os.environ['LIBUSB_DEBUG'] = '0' from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QPushButton, QLineEdit, QFrame, QCheckBox, QMessageBox, QFileDialog, QProgressBar) from PyQt5.QtCore import (Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread, QMutex, QMutexLocker) from PyQt5.QtGui import QColor, QPalette from collections import deque import pysmu import matplotlib matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure class DeviceDisconnectedError(Exception): """Custom exception for device disconnection events.""" pass class MeasurementThread(QThread): """Thread for continuous measurement of voltage and current.""" update_signal = pyqtSignal(float, float, float) # voltage, current, timestamp error_signal = pyqtSignal(str) status_signal = pyqtSignal(str) def __init__(self, device: object, interval: float = 0.1, start_time: float = None): """Initialize measurement thread.""" super().__init__() self.device = device self.interval = max(0.05, interval) # Minimum interval self._running = False self._mutex = QMutex() # Thread safety self.filter_window_size = 10 self.start_time = time.time() self.last_update_time = self.start_time def run(self): """Main measurement loop with enhanced error handling.""" self._running = True voltage_window = deque() current_window = deque() self.status_signal.emit("Measurement started") while self._running: try: # Read samples with timeout samples = self.device.read( self.filter_window_size, timeout=500, ) if not samples: raise DeviceDisconnectedError("No samples received - device may be disconnected") # Process samples (thread-safe) with QMutexLocker(self._mutex): raw_voltage = np.mean([s[1][0] for s in samples]) raw_current = np.mean([s[0][1] for s in samples]) if len(voltage_window) >= self.filter_window_size: voltage_window.popleft() current_window.popleft() voltage_window.append(raw_voltage) current_window.append(raw_current) voltage = np.mean(list(voltage_window)) current = np.mean(list(current_window)) current_time = time.time() - self.start_time # Emit updates self.update_signal.emit(voltage, current, current_time) self.last_update_time = time.time() # Dynamic sleep adjustment elapsed = time.time() - self.last_update_time sleep_time = max(0.01, self.interval - elapsed) time.sleep(sleep_time) except DeviceDisconnectedError as e: self.error_signal.emit(f"Device error: {str(e)}") break except Exception as e: self.error_signal.emit(f"Measurement error: {str(e)}") self.status_signal.emit(f"Error: {str(e)}") break self.status_signal.emit("Measurement stopped") self._running = False def stop(self): """Safe thread termination with timeout.""" self._running = False if self.isRunning(): self.quit() if not self.wait(300): # 300ms grace period self.terminate() def is_active(self) -> bool: """Check if thread is running and updating.""" with QMutexLocker(self._mutex): return self._running and (time.time() - self.last_update_time < 2.0) class TestSequenceThread(QThread): """Thread for executing battery test sequences.""" progress_updated = pyqtSignal(float, str) # progress, phase cycle_completed = pyqtSignal(int, float, float, float) # cycle, discharge, charge, efficiency error_occurred = pyqtSignal(str) def __init__(self, parent: QObject): """Initialize test sequence thread. Args: parent: Reference to main BatteryTester instance """ super().__init__() self.parent = parent self._mutex = QMutex() self._running = False def run(self): """Execute the complete test sequence.""" self._running = True try: test_current = float(self.parent.c_rate_input.text()) * float(self.parent.capacity_input.text()) charge_cutoff = float(self.parent.charge_cutoff_input.text()) discharge_cutoff = float(self.parent.discharge_cutoff_input.text()) while self._running and (self.parent.continuous_mode or self.parent.cycle_count == 0): with QMutexLocker(self._mutex): if self.parent.request_stop: break self.parent.cycle_count += 1 cycle = self.parent.cycle_count # Charge phase self._execute_phase("charge", test_current, charge_cutoff, discharge_cutoff, charge_cutoff) if not self._running: break # Rest after charge self._execute_rest("post-charge") if not self._running: break # Discharge phase self._execute_phase("discharge", test_current, discharge_cutoff, discharge_cutoff, charge_cutoff) if not self.parent.continuous_mode: break # Rest after discharge if self._running: self._execute_rest("post-discharge") # Calculate efficiency if self.parent.charge_capacity > 0: efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100 self.cycle_completed.emit(cycle, self.parent.capacity_ah, self.parent.charge_capacity, efficiency) except Exception as e: self.error_occurred.emit(str(e)) finally: self._running = False def _execute_phase(self, phase: str, current: float, target_voltage: float, discharge_cutoff: float, charge_cutoff: float): try: print(f"\n=== Starting {phase} phase ===") print(f"Device session active: {self.parent.session_active}") print(f"Channel A mode before: {self.parent.dev.channels['A'].mode}") # Reset channel first self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z self.parent.dev.channels['A'].constant(0) time.sleep(0.5) # Increased settling time # Configure for current mode if phase == "charge": print(f"Starting CHARGE at {current}A to {target_voltage}V") self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV time.sleep(0.1) # Additional delay self.parent.dev.channels['A'].constant(current) else: print(f"Starting DISCHARGE at {-current}A to {target_voltage}V") self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV time.sleep(0.1) # Additional delay self.parent.dev.channels['A'].constant(-current) time.sleep(0.5) # Allow more settling time # Verify current setting samples = self.parent.dev.read(10, timeout=1000) # More samples, longer timeout measured_current = np.mean([s[0][1] for s in samples]) print(f"Requested {current}A, Measured {measured_current:.6f}A") # More precision print(f"Channel A mode after: {self.parent.dev.channels['A'].mode}") # Rest of your existing loop... time.sleep(0.1) start_time = time.time() last_update = start_time while self._running: with QMutexLocker(self._mutex): if self.parent.request_stop: break if not self.parent.voltage_data: time.sleep(0.1) continue current_voltage = self.parent.voltage_data[-1] current_time = time.time() delta_t = current_time - last_update last_update = current_time # Update capacity if phase == "charge": self.parent.charge_capacity += abs(self.parent.current_data[-1]) * delta_t / 3600 progress = (current_voltage - discharge_cutoff) / (target_voltage - discharge_cutoff) else: self.parent.capacity_ah += abs(self.parent.current_data[-1]) * delta_t / 3600 progress = (charge_cutoff - current_voltage) / (charge_cutoff - target_voltage) progress = max(0.0, min(1.0, progress)) self.progress_updated.emit(progress, f"{phase.capitalize()}ing") # Check termination conditions if ((phase == "charge" and current_voltage >= target_voltage) or (phase == "discharge" and current_voltage <= target_voltage)): break time.sleep(0.1) except AttributeError as e: if '_constant' in str(e): print("Warning: Internal attribute check failed, but current setting should still work") # Continue with the test else: self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}") raise except Exception as e: self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}") raise def _execute_rest(self, phase: str): """Execute rest phase. Args: phase: Description of rest phase """ try: self.progress_updated.emit(0.0, f"Resting ({phase})") self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z self.parent.dev.channels['A'].constant(0) rest_time = float(self.parent.rest_time_input.text()) * 3600 rest_end = time.time() + rest_time while time.time() < rest_end and self._running: with QMutexLocker(self._mutex): if self.parent.request_stop: break progress = 1 - (rest_end - time.time()) / rest_time self.progress_updated.emit(progress, f"Resting ({phase})") time.sleep(1) except Exception as e: self.error_occurred.emit(f"Rest phase error: {str(e)}") raise def stop(self): """Safely stop the test sequence.""" with QMutexLocker(self._mutex): self._running = False self.wait(500) # Wait up to 500ms for clean exit class BatteryTester(QMainWindow): """Main application window for battery capacity testing.""" error_signal = pyqtSignal(str) def __init__(self): """Initialize the battery tester application.""" super().__init__() self.error_signal.connect(self.handle_device_error) # Initialize data buffers self.time_data = deque() self.voltage_data = deque() self.current_data = deque() self.phase_data = deque() # Test variables self.test_phase = "Ready" self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 # Color scheme self.bg_color = QColor(46, 52, 64) self.fg_color = QColor(216, 222, 233) self.accent_color = QColor(94, 129, 172) self.warning_color = QColor(191, 97, 106) self.success_color = QColor(163, 190, 140) # Device status self.session_active = False self.measuring = False self.test_running = False self.continuous_mode = False self.request_stop = False self.interval = 0.1 self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) self.start_time = time.time() # Thread management self.measurement_thread = None self.test_thread = None # Initialize UI self._setup_ui() # Initialize device with delay QTimer.singleShot(100, self.safe_init_device) def _setup_ui(self): """Configure the user interface.""" self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)") self.resize(1000, 800) self.setMinimumSize(800, 700) # Set color palette palette = self.palette() palette.setColor(QPalette.Window, self.bg_color) palette.setColor(QPalette.WindowText, self.fg_color) palette.setColor(QPalette.Base, QColor(59, 66, 82)) palette.setColor(QPalette.Text, self.fg_color) palette.setColor(QPalette.Button, self.accent_color) palette.setColor(QPalette.ButtonText, self.fg_color) self.setPalette(palette) # Main widget and layout self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.main_layout = QVBoxLayout(self.central_widget) self.main_layout.setContentsMargins(10, 10, 10, 10) self.main_layout.setSpacing(10) # Header section header_frame = QWidget() header_layout = QHBoxLayout(header_frame) header_layout.setContentsMargins(0, 0, 0, 0) self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)") self.title_label.setStyleSheet(f"font-size: 16px; font-weight: bold; color: {self.accent_color.name()};") header_layout.addWidget(self.title_label, 1) # Connection indicator self.connection_label = QLabel("Disconnected") header_layout.addWidget(self.connection_label) self.status_light = QLabel() self.status_light.setFixedSize(20, 20) self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") header_layout.addWidget(self.status_light) # Reconnect button self.reconnect_btn = QPushButton("Connect") self.reconnect_btn.clicked.connect(self.reconnect_device) header_layout.addWidget(self.reconnect_btn) self.main_layout.addWidget(header_frame) # Measurement display display_frame = QFrame() display_frame.setFrameShape(QFrame.StyledPanel) display_layout = QGridLayout(display_frame) display_layout.setHorizontalSpacing(15) display_layout.setVerticalSpacing(8) measurement_labels = [ ("Voltage", "V"), ("Current", "A"), ("Test Phase", ""), ("Elapsed Time", "s"), ("Discharge Capacity", "Ah"), ("Charge Capacity", "Ah"), ("Coulomb Efficiency", "%"), ("Cycle Count", ""), ] self.value_labels = {} # Optional: Store labels for easy update for i, (label, unit) in enumerate(measurement_labels): row = i // 2 col = (i % 2) * 3 # Each block is 3 columns: label | value | unit name_label = QLabel(label + ":") name_label.setStyleSheet("font-size: 11px;") display_layout.addWidget(name_label, row, col) value_label = QLabel("0.000") value_label.setStyleSheet("font-size: 12px; font-weight: bold; min-width: 60px;") display_layout.addWidget(value_label, row, col + 1) unit_label = QLabel(unit) unit_label.setStyleSheet("font-size: 11px; color: gray;") display_layout.addWidget(unit_label, row, col + 2) # Save reference for updating later self.value_labels[label] = value_label # Assign to instance attributes for specific fields if label == "Voltage": self.voltage_label = value_label elif label == "Current": self.current_label = value_label elif label == "Test Phase": self.phase_label = value_label elif label == "Elapsed Time": self.time_label = value_label elif label == "Discharge Capacity": self.capacity_label = value_label elif label == "Charge Capacity": self.charge_capacity_label = value_label elif label == "Coulomb Efficiency": self.efficiency_label = value_label elif label == "Cycle Count": self.cycle_label = value_label self.main_layout.addWidget(display_frame) # Progress bar self.progress_bar = QProgressBar() self.progress_bar.setRange(0, 100) self.progress_bar.setTextVisible(False) self.progress_bar.setStyleSheet(f""" QProgressBar {{ border: 1px solid {self.fg_color.name()}; border-radius: 5px; text-align: center; }} QProgressBar::chunk {{ background-color: {self.accent_color.name()}; }} """) self.main_layout.addWidget(self.progress_bar) # Control section controls_frame = QWidget() controls_layout = QHBoxLayout(controls_frame) controls_layout.setContentsMargins(0, 0, 0, 0) # Parameters frame params_frame = QFrame() params_frame.setFrameShape(QFrame.StyledPanel) params_layout = QGridLayout(params_frame) # Battery capacity params_layout.addWidget(QLabel("Battery Capacity (Ah):"), 0, 0) self.capacity_input = QLineEdit("0.2") self.capacity_input.setFixedWidth(80) self.capacity_input.setToolTip("Nominal capacity of the battery in Amp-hours") params_layout.addWidget(self.capacity_input, 0, 1) # Charge cutoff voltage params_layout.addWidget(QLabel("Charge Cutoff (V):"), 1, 0) self.charge_cutoff_input = QLineEdit("1.43") self.charge_cutoff_input.setFixedWidth(80) self.charge_cutoff_input.setToolTip("Voltage at which charging should stop") params_layout.addWidget(self.charge_cutoff_input, 1, 1) # Discharge cutoff voltage params_layout.addWidget(QLabel("Discharge Cutoff (V):"), 2, 0) self.discharge_cutoff_input = QLineEdit("0.9") self.discharge_cutoff_input.setFixedWidth(80) self.discharge_cutoff_input.setToolTip("Voltage at which discharging should stop") params_layout.addWidget(self.discharge_cutoff_input, 2, 1) # Rest time params_layout.addWidget(QLabel("Rest Time (hours):"), 3, 0) self.rest_time_input = QLineEdit("0.25") self.rest_time_input.setFixedWidth(80) self.rest_time_input.setToolTip("Rest period between charge/discharge cycles") params_layout.addWidget(self.rest_time_input, 3, 1) # C-Rate for test params_layout.addWidget(QLabel("Test C-Rate:"), 0, 2) self.c_rate_input = QLineEdit("0.1") self.c_rate_input.setFixedWidth(60) self.c_rate_input.setToolTip("Charge/discharge rate relative to battery capacity (e.g., 0.2 for C/5)") params_layout.addWidget(self.c_rate_input, 0, 3) params_layout.addWidget(QLabel("(e.g., 0.2 for C/5)"), 0, 4) controls_layout.addWidget(params_frame, 1) # Button area button_frame = QWidget() button_layout = QVBoxLayout(button_frame) button_layout.setContentsMargins(0, 0, 0, 0) self.start_button = QPushButton("START TEST") self.start_button.clicked.connect(self.start_test) self.start_button.setStyleSheet(f""" QPushButton {{ background-color: {self.accent_color.name()}; font-weight: bold; padding: 8px; border-radius: 5px; }} QPushButton:disabled {{ background-color: #4C566A; }} """) self.start_button.setEnabled(False) button_layout.addWidget(self.start_button) self.stop_button = QPushButton("STOP TEST") self.stop_button.clicked.connect(self.stop_test) self.stop_button.setStyleSheet(f""" QPushButton {{ background-color: {self.warning_color.name()}; font-weight: bold; padding: 8px; border-radius: 5px; }} QPushButton:disabled {{ background-color: #4C566A; }} """) self.stop_button.setEnabled(False) button_layout.addWidget(self.stop_button) # Continuous mode checkbox self.continuous_check = QCheckBox("Continuous Mode") self.continuous_check.setChecked(True) self.continuous_check.setToolTip("Run multiple charge/discharge cycles until stopped") button_layout.addWidget(self.continuous_check) controls_layout.addWidget(button_frame) self.main_layout.addWidget(controls_frame) # Plot area self._setup_plot() self.main_layout.addWidget(self.plot_widget, 1) # Status bar self.status_bar = QLabel("Ready") self.status_bar.setStyleSheet("font-size: 10px; padding: 5px;") self.main_layout.addWidget(self.status_bar) def _setup_plot(self): """Configure the matplotlib plot.""" self.plot_widget = QWidget() plot_layout = QVBoxLayout(self.plot_widget) self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440') self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) self.ax = self.fig.add_subplot(111) self.ax.set_facecolor('#3B4252') # Initial voltage range voltage_padding = 0.2 min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) # Voltage plot self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) self.ax.set_ylabel("Voltage (V)", color='#00BFFF') self.ax.tick_params(axis='y', labelcolor='#00BFFF') # Current plot (right axis) self.ax2 = self.ax.twinx() current_padding = 0.05 test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) max_current = test_current * 1.5 self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) self.ax2.set_ylabel("Current (A)", color='r') self.ax2.tick_params(axis='y', labelcolor='r') self.ax.set_xlabel('Time (s)', color=self.fg_color.name()) self.ax.set_title('Battery Test (CC)', color=self.fg_color.name()) self.ax.tick_params(axis='x', colors=self.fg_color.name()) self.ax.grid(True, color='#4C566A') # Position legends self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) # Embed plot self.canvas = FigureCanvas(self.fig) plot_layout.addWidget(self.canvas) def safe_init_device(self): """Safe device initialization with error handling.""" try: self.init_device() except Exception as e: self.handle_device_error(str(e)) def init_device(self): """Initialize the ADALM1000 device.""" # Temporarily enable USB debugging os.environ['LIBUSB_DEBUG'] = '3' # Set to 0 in production self.cleanup_device() try: print("Waiting before initializing session...") time.sleep(1.5) # Delay helps avoid "device busy" issues self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) # 🔍 Log detected devices print(f"Devices found: {self.session.devices}") if not self.session.devices: raise Exception("No ADALM1000 detected - check USB connection") self.dev = self.session.devices[0] # Reset channels self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].constant(0) self.session.start(0) # Update UI self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") self.connection_label.setText("Connected") self.status_bar.setText("Device connected | Ready for measurement") self.session_active = True self.start_button.setEnabled(True) # Start measurement thread self.start_measurement_thread() except Exception as e: raise Exception(f"Device initialization failed: {str(e)}") def cleanup_device(self): """Clean up device resources.""" print("Cleaning up device session...") # Stop measurement thread if self.measurement_thread is not None: try: self.measurement_thread.stop() if not self.measurement_thread.wait(1000): print("Warning: Measurement thread didn't stop cleanly") self.measurement_thread = None except Exception as e: print(f"Error stopping measurement thread: {e}") # Stop and delete session if hasattr(self, 'session'): try: if self.session_active: time.sleep(0.1) self.session.end() self.session_active = False del self.session print("Session ended successfully") except Exception as e: print(f"Error ending session: {e}") finally: self.session_active = False # Reset UI indicators self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") self.connection_label.setText("Disconnected") self.start_button.setEnabled(False) self.stop_button.setEnabled(False) def start_measurement_thread(self): """Start the measurement thread.""" if self.measurement_thread is not None: self.measurement_thread.stop() self.measurement_thread.wait(500) self.measurement_thread = MeasurementThread( device=self.dev, interval=self.interval, start_time=self.start_time ) self.measurement_thread.update_signal.connect(self.update_measurements) self.measurement_thread.error_signal.connect(self.handle_device_error) self.measurement_thread.start() def reconnect_device(self): """Attempt to reconnect the device.""" self.status_bar.setText("Attempting to reconnect...") self.cleanup_device() QTimer.singleShot(2000, self.safe_init_device) # Retry with delay def handle_device_error(self, error_msg): """Thread-safe device error handling.""" try: # Ensure this runs in the main thread if QThread.currentThread() != self.thread(): self.error_signal.emit(str(error_msg)) return print(f"Device error: {error_msg}") def update_ui(): self.status_bar.setText(f"Device error: {error_msg}") if self.isVisible(): QMessageBox.critical( self, "Device Error", f"Device error occurred:\n{error_msg}\n\n" "1. Check USB connection\n" "2. Try manual reconnect\n" "3. Restart application if problems persist" ) QTimer.singleShot(0, lambda: ( self.cleanup_device(), update_ui() )) except Exception as e: print(f"Error in error handler: {str(e)}") try: self.cleanup_device() except: pass def start_test(self): """Start the complete battery test cycle.""" if not self.test_running: try: # Get and validate input values capacity = float(self.capacity_input.text()) charge_cutoff = float(self.charge_cutoff_input.text()) discharge_cutoff = float(self.discharge_cutoff_input.text()) c_rate = float(self.c_rate_input.text()) if capacity <= 0: raise ValueError("Battery capacity must be positive") if charge_cutoff <= discharge_cutoff: raise ValueError("Charge cutoff must be higher than discharge cutoff") if c_rate <= 0: raise ValueError("C-rate must be positive") self.continuous_mode = self.continuous_check.isChecked() test_current = c_rate * capacity if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") # Reset data self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 self.start_time = time.time() self.time_label.setText("00:00:00") self.reset_plot() # Prepare log file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}") # Start test self.test_running = True self.start_time = time.time() self.test_phase = "Initial Discharge" self.phase_label.setText(self.test_phase) self.start_button.setEnabled(False) self.stop_button.setEnabled(True) self.status_bar.setText(f"Test started | Discharging to {discharge_cutoff}V @ {test_current:.3f}A") # Start test sequence in separate thread self.test_thread = TestSequenceThread(self) self.test_thread.progress_updated.connect(self.update_test_progress) self.test_thread.cycle_completed.connect(self.update_cycle_stats) self.test_thread.error_occurred.connect(self.handle_device_error) self.test_thread.finished.connect(self.finalize_test) self.test_thread.start() except Exception as e: QMessageBox.critical(self, "Error", str(e)) def update_test_progress(self, progress: float, phase: str): """Update test progress and phase display.""" self.test_phase = phase self.phase_label.setText(phase) self.progress_bar.setValue(int(progress * 100)) def update_cycle_stats(self, cycle: int, discharge: float, charge: float, efficiency: float): """Update cycle statistics.""" self.cycle_count = cycle self.capacity_ah = discharge self.charge_capacity = charge self.coulomb_efficiency = efficiency self.cycle_label.setText(f"{cycle}") self.capacity_label.setText(f"{discharge:.4f}") self.charge_capacity_label.setText(f"{charge:.4f}") self.efficiency_label.setText(f"{efficiency:.1f}") self.status_bar.setText( f"Cycle {cycle} completed | " f"Discharge: {discharge:.3f}Ah | " f"Charge: {charge:.3f}Ah | " f"Efficiency: {efficiency:.1f}%" ) # Write cycle summary self.write_cycle_summary() def stop_test(self): """Safely stop the running test.""" if not self.test_running: return self.request_stop = True self.test_running = False self.measuring = False self.test_phase = "Ready" self.phase_label.setText(self.test_phase) if hasattr(self, 'dev'): try: self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) except Exception as e: print(f"Error resetting device: {e}") # Update UI self.status_bar.setText("Test stopped - Ready for new test") self.stop_button.setEnabled(False) self.start_button.setEnabled(True) self.finalize_test(show_message=False) def finalize_test(self, show_message: bool = True): """Final cleanup after test completion.""" try: # Write log data if hasattr(self, 'log_buffer') and self.log_buffer: try: with open(self.filename, 'a', newline='') as f: writer = csv.writer(f) writer.writerows(self.log_buffer) self.log_buffer.clear() except Exception as e: print(f"Error writing log buffer: {e}") # Close log file if hasattr(self, 'current_cycle_file') and self.current_cycle_file: try: self.current_cycle_file.flush() os.fsync(self.current_cycle_file.fileno()) self.current_cycle_file.close() except Exception as e: print(f"Error closing log file: {e}") # Show notification if show_message: msg_box = QMessageBox(self) msg_box.setWindowFlags(msg_box.windowFlags() | Qt.WindowStaysOnTopHint) msg_box.setIcon(QMessageBox.Information) msg_box.setWindowTitle("Test Complete") msg_box.setText(f"Test completed\nCycles: {self.cycle_count}") msg_box.exec_() except Exception as e: print(f"Critical error in finalize_test: {e}") finally: # Reset test status self.test_running = False self.request_stop = True self.measuring = False def update_measurements(self, voltage: float, current: float, current_time: float): """Update measurements in the UI.""" if len(self.time_data) > 10000: # Limit data points self.time_data.popleft() self.voltage_data.popleft() self.current_data.popleft() self.time_data.append(current_time) self.voltage_data.append(voltage) self.current_data.append(current) # Update UI self.voltage_label.setText(f"{voltage:.4f}") self.current_label.setText(f"{current:.4f}") self.time_label.setText(self.format_time(current_time)) # Update plot periodically if len(self.time_data) % 10 == 0: self.update_plot() # Log data if test is running if self.test_running and hasattr(self, 'current_cycle_file'): self.log_buffer.append([ f"{current_time:.3f}", f"{voltage:.6f}", f"{current:.6f}", self.test_phase, f"{self.capacity_ah:.4f}", f"{self.charge_capacity:.4f}", f"{self.coulomb_efficiency:.1f}", f"{self.cycle_count}" ]) # Write data in blocks if len(self.log_buffer) >= 10: with open(self.filename, 'a', newline='') as f: writer = csv.writer(f) writer.writerows(self.log_buffer) self.log_buffer.clear() def write_cycle_summary(self): """Write cycle summary to the current cycle's log file""" if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file: return summary_line = ( f"Cycle {self.cycle_count.get()} Summary - " f"Discharge={self.capacity_ah.get():.4f}Ah, " f"Charge={self.charge_capacity.get():.4f}Ah, " f"Efficiency={self.coulomb_efficiency.get():.1f}%" ) # Ensure file is open and write summary try: if self.log_buffer: self.log_writer.writerows(self.log_buffer) self.log_buffer.clear() self.current_cycle_file.write(summary_line + "\n") self.current_cycle_file.flush() except Exception as e: print(f"Error writing cycle summary: {e}") def update_plot(self): """Update the plot with new data.""" if not self.time_data: return self.line_voltage.set_data(list(self.time_data), list(self.voltage_data)) self.line_current.set_data(list(self.time_data), list(self.current_data)) self.auto_scale_axes() self.canvas.draw_idle() def auto_scale_axes(self): """Automatically adjust plot axes.""" if not self.time_data: return # X-axis adjustment max_time = list(self.time_data)[-1] current_xlim = self.ax.get_xlim() if max_time > current_xlim[1] * 0.95: new_xmax = max_time * 1.10 # 10% padding self.ax.set_xlim(0, new_xmax) self.ax2.set_xlim(0, new_xmax) # Y-axes adjustment if self.voltage_data: voltage_padding = 0.2 min_v = max(0, min(list(self.voltage_data)) - voltage_padding) max_v = min(5.0, max(list(self.voltage_data)) + voltage_padding) current_ylim = self.ax.get_ylim() new_min = current_ylim[0] + (min_v - current_ylim[0]) * 0.1 new_max = current_ylim[1] + (max_v - current_ylim[1]) * 0.1 self.ax.set_ylim(new_min, new_max) if self.current_data: current_padding = 0.05 min_c = max(-0.25, min(list(self.current_data)) - current_padding) max_c = min(0.25, max(list(self.current_data)) + current_padding) current_ylim = self.ax2.get_ylim() new_min = current_ylim[0] + (min_c - current_ylim[0]) * 0.1 new_max = current_ylim[1] + (max_c - current_ylim[1]) * 0.1 self.ax2.set_ylim(new_min, new_max) @staticmethod def format_time(seconds: float) -> str: """Format seconds as HH:MM:SS. Args: seconds: Time in seconds Returns: Formatted time string """ hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def reset_plot(self): """Reset the plot to initial state.""" self.line_voltage.set_data([], []) self.line_current.set_data([], []) # Reset axes to starting values self.ax.set_xlim(0, 10) voltage_padding = 0.2 min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) self.canvas.draw() def closeEvent(self, event): """Clean up when closing the window.""" self.cleanup_device() event.accept() if __name__ == "__main__": app = QApplication([]) app.setStyle('Fusion') window = BatteryTester() window.show() app.exec_()