# -*- coding: utf-8 -*- import os import time import csv from datetime import datetime import numpy as np # Suppress warnings os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false' os.environ['LIBUSB_DEBUG'] = '0' from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QPushButton, QLineEdit, QFrame, QCheckBox, QMessageBox, QFileDialog, QProgressBar) from PyQt5.QtCore import (Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread, QMutex, QMutexLocker) from PyQt5.QtGui import QColor, QPalette from collections import deque import pysmu import matplotlib matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure class DeviceDisconnectedError(Exception): """Custom exception for device disconnection events.""" pass class MeasurementThread(QThread): """Thread for continuous measurement of voltage and current.""" update_signal = pyqtSignal(float, float, float) # voltage, current, timestamp error_signal = pyqtSignal(str) status_signal = pyqtSignal(str) def __init__(self, device: object, interval: float = 0.1): """Initialize measurement thread. Args: device: ADALM1000 device object interval: Measurement interval in seconds """ super().__init__() self.device = device self.interval = max(0.05, interval) # Minimum interval self._running = False self._mutex = QMutex() # Thread safety self.filter_window_size = 10 self.start_time = time.time() self.last_update_time = self.start_time def run(self): """Main measurement loop with enhanced error handling.""" self._running = True voltage_window = deque() current_window = deque() self.status_signal.emit("Measurement started") while self._running: try: # Read samples with timeout samples = self.device.read( self.filter_window_size, timeout=500, ) if not samples: raise DeviceDisconnectedError("No samples received - device may be disconnected") # Process samples (thread-safe) with QMutexLocker(self._mutex): raw_voltage = np.mean([s[1][0] for s in samples]) raw_current = np.mean([s[0][1] for s in samples]) if voltage_window.size() >= self.filter_window_size: voltage_window.append current_window.append voltage_window.append(raw_voltage) current_window.append(raw_current) voltage = np.mean(list(voltage_window)) current = np.mean(list(current_window)) current_time = time.time() - self.start_time # Emit updates self.update_signal.emit(voltage, current, current_time) self.last_update_time = time.time() # Dynamic sleep adjustment elapsed = time.time() - self.last_update_time sleep_time = max(0.01, self.interval - elapsed) time.sleep(sleep_time) except DeviceDisconnectedError as e: self.error_signal.emit(f"Device error: {str(e)}") break except Exception as e: self.error_signal.emit(f"Measurement error: {str(e)}") self.status_signal.emit(f"Error: {str(e)}") break self.status_signal.emit("Measurement stopped") self._running = False def stop(self): """Safe thread termination with timeout.""" self._running = False if self.isRunning(): self.quit() if not self.wait(300): # 300ms grace period self.terminate() def is_active(self) -> bool: """Check if thread is running and updating.""" with QMutexLocker(self._mutex): return self._running and (time.time() - self.last_update_time < 2.0) class TestSequenceThread(QThread): """Thread for executing battery test sequences.""" progress_updated = pyqtSignal(float, str) # progress, phase cycle_completed = pyqtSignal(int, float, float, float) # cycle, discharge, charge, efficiency error_occurred = pyqtSignal(str) def __init__(self, parent: QObject): """Initialize test sequence thread. Args: parent: Reference to main BatteryTester instance """ super().__init__() self.parent = parent self._mutex = QMutex() self._running = False def run(self): """Execute the complete test sequence.""" self._running = True try: test_current = float(self.parent.c_rate_input.text()) * float(self.parent.capacity_input.text()) charge_cutoff = float(self.parent.charge_cutoff_input.text()) discharge_cutoff = float(self.parent.discharge_cutoff_input.text()) while self._running and (self.parent.continuous_mode or self.parent.cycle_count == 0): with QMutexLocker(self._mutex): if self.parent.request_stop: break self.parent.cycle_count += 1 cycle = self.parent.cycle_count # Charge phase self._execute_phase("charge", test_current, charge_cutoff) if not self._running: break # Rest after charge self._execute_rest("post-charge") if not self._running: break # Discharge phase self._execute_phase("discharge", test_current, discharge_cutoff) if not self.parent.continuous_mode: break # Rest after discharge if self._running: self._execute_rest("post-discharge") # Calculate efficiency if self.parent.charge_capacity > 0: efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100 self.cycle_completed.emit(cycle, self.parent.capacity_ah, self.parent.charge_capacity, efficiency) except Exception as e: self.error_occurred.emit(str(e)) finally: self._running = False def _execute_phase(self, phase: str, current: float, target_voltage: float): """Execute charge/discharge phase. Args: phase: Either 'charge' or 'discharge' current: Current in amps target_voltage: Target voltage in volts """ try: if not hasattr(self.parent, 'dev') or not self.parent.session_active: raise DeviceDisconnectedError("Device not connected") # Configure device self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z time.sleep(0.1) if phase == "charge": self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV self.parent.dev.channels['A'].constant(current) self.progress_updated.emit(0.0, "Charging") else: self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV self.parent.dev.channels['A'].constant(-current) self.progress_updated.emit(0.0, "Discharging") time.sleep(0.1) start_time = time.time() last_update = start_time while self._running: with QMutexLocker(self._mutex): if self.parent.request_stop: break if not self.parent.voltage_data: time.sleep(0.1) continue current_voltage = self.parent.voltage_data[-1] current_time = time.time() delta_t = current_time - last_update last_update = current_time # Update capacity if phase == "charge": self.parent.charge_capacity += abs(self.parent.current_data[-1]) * delta_t / 3600 progress = (current_voltage - self.parent.discharge_cutoff) / \ (target_voltage - self.parent.discharge_cutoff) else: self.parent.capacity_ah += abs(self.parent.current_data[-1]) * delta_t / 3600 progress = (self.parent.charge_cutoff - current_voltage) / \ (self.parent.charge_cutoff - target_voltage) progress = max(0.0, min(1.0, progress)) self.progress_updated.emit(progress, f"{phase.capitalize()}ing") # Check termination conditions if ((phase == "charge" and current_voltage >= target_voltage) or (phase == "discharge" and current_voltage <= target_voltage)): break time.sleep(0.1) except Exception as e: self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}") raise def _execute_rest(self, phase: str): """Execute rest phase. Args: phase: Description of rest phase """ try: self.progress_updated.emit(0.0, f"Resting ({phase})") self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z self.parent.dev.channels['A'].constant(0) rest_time = float(self.parent.rest_time_input.text()) * 3600 rest_end = time.time() + rest_time while time.time() < rest_end and self._running: with QMutexLocker(self._mutex): if self.parent.request_stop: break progress = 1 - (rest_end - time.time()) / rest_time self.progress_updated.emit(progress, f"Resting ({phase})") time.sleep(1) except Exception as e: self.error_occurred.emit(f"Rest phase error: {str(e)}") raise def stop(self): """Safely stop the test sequence.""" with QMutexLocker(self._mutex): self._running = False self.wait(500) # Wait up to 500ms for clean exit class BatteryTester(QMainWindow): """Main application window for battery capacity testing.""" error_signal = pyqtSignal(str) def __init__(self): """Initialize the battery tester application.""" super().__init__() self.error_signal.connect(self.handle_device_error) # Initialize data buffers self.time_data = deque() self.voltage_data = deque() self.current_data = deque() self.phase_data = deque() # Test variables self.test_phase = "Ready" self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 # Color scheme self.bg_color = QColor(46, 52, 64) self.fg_color = QColor(216, 222, 233) self.accent_color = QColor(94, 129, 172) self.warning_color = QColor(191, 97, 106) self.success_color = QColor(163, 190, 140) # Device status self.session_active = False self.measuring = False self.test_running = False self.continuous_mode = False self.request_stop = False self.interval = 0.1 self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) # Thread management self.measurement_thread = None self.test_thread = None # Initialize UI self._setup_ui() # Initialize device with delay QTimer.singleShot(100, self.safe_init_device) def _setup_ui(self): """Configure the user interface.""" self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)") self.resize(1000, 800) self.setMinimumSize(800, 700) # Set color palette palette = self.palette() palette.setColor(QPalette.Window, self.bg_color) palette.setColor(QPalette.WindowText, self.fg_color) palette.setColor(QPalette.Base, QColor(59, 66, 82)) palette.setColor(QPalette.Text, self.fg_color) palette.setColor(QPalette.Button, self.accent_color) palette.setColor(QPalette.ButtonText, self.fg_color) self.setPalette(palette) # Main widget and layout self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.main_layout = QVBoxLayout(self.central_widget) self.main_layout.setContentsMargins(10, 10, 10, 10) self.main_layout.setSpacing(10) # Header section header_frame = QWidget() header_layout = QHBoxLayout(header_frame) header_layout.setContentsMargins(0, 0, 0, 0) self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)") self.title_label.setStyleSheet(f"font-size: 16px; font-weight: bold; color: {self.accent_color.name()};") header_layout.addWidget(self.title_label, 1) # Connection indicator self.connection_label = QLabel("Disconnected") header_layout.addWidget(self.connection_label) self.status_light = QLabel() self.status_light.setFixedSize(20, 20) self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") header_layout.addWidget(self.status_light) # Reconnect button self.reconnect_btn = QPushButton("Connect") self.reconnect_btn.clicked.connect(self.reconnect_device) header_layout.addWidget(self.reconnect_btn) self.main_layout.addWidget(header_frame) # Measurement display display_frame = QFrame() display_frame.setFrameShape(QFrame.StyledPanel) display_layout = QGridLayout(display_frame) measurement_labels = [ ("Voltage (V)", "V"), ("Current (A)", "A"), ("Test Phase", ""), ("Elapsed Time", "s"), ("Discharge Capacity", "Ah"), ("Charge Capacity", "Ah"), ("Coulomb Eff.", "%"), ("Cycle Count", ""), ] for i, (label, unit) in enumerate(measurement_labels): row = i // 2 col = (i % 2) * 2 lbl = QLabel(f"{label}:") lbl.setStyleSheet("font-size: 11px;") display_layout.addWidget(lbl, row, col) value_label = QLabel("0.000") value_label.setStyleSheet("font-size: 12px; font-weight: bold;") display_layout.addWidget(value_label, row, col + 1) if unit: unit_label = QLabel(unit) display_layout.addWidget(unit_label, row, col + 2) if i == 0: self.voltage_label = value_label elif i == 1: self.current_label = value_label elif i == 2: self.phase_label = value_label self.phase_label.setText(self.test_phase) elif i == 3: self.time_label = value_label elif i == 4: self.capacity_label = value_label elif i == 5: self.charge_capacity_label = value_label elif i == 6: self.efficiency_label = value_label elif i == 7: self.cycle_label = value_label self.main_layout.addWidget(display_frame) # Progress bar self.progress_bar = QProgressBar() self.progress_bar.setRange(0, 100) self.progress_bar.setTextVisible(False) self.progress_bar.setStyleSheet(f""" QProgressBar {{ border: 1px solid {self.fg_color.name()}; border-radius: 5px; text-align: center; }} QProgressBar::chunk {{ background-color: {self.accent_color.name()}; }} """) self.main_layout.addWidget(self.progress_bar) # Control section controls_frame = QWidget() controls_layout = QHBoxLayout(controls_frame) controls_layout.setContentsMargins(0, 0, 0, 0) # Parameters frame params_frame = QFrame() params_frame.setFrameShape(QFrame.StyledPanel) params_layout = QGridLayout(params_frame) # Battery capacity params_layout.addWidget(QLabel("Battery Capacity (Ah):"), 0, 0) self.capacity_input = QLineEdit("0.2") self.capacity_input.setFixedWidth(80) self.capacity_input.setToolTip("Nominal capacity of the battery in Amp-hours") params_layout.addWidget(self.capacity_input, 0, 1) # Charge cutoff voltage params_layout.addWidget(QLabel("Charge Cutoff (V):"), 1, 0) self.charge_cutoff_input = QLineEdit("1.43") self.charge_cutoff_input.setFixedWidth(80) self.charge_cutoff_input.setToolTip("Voltage at which charging should stop") params_layout.addWidget(self.charge_cutoff_input, 1, 1) # Discharge cutoff voltage params_layout.addWidget(QLabel("Discharge Cutoff (V):"), 2, 0) self.discharge_cutoff_input = QLineEdit("0.9") self.discharge_cutoff_input.setFixedWidth(80) self.discharge_cutoff_input.setToolTip("Voltage at which discharging should stop") params_layout.addWidget(self.discharge_cutoff_input, 2, 1) # Rest time params_layout.addWidget(QLabel("Rest Time (hours):"), 3, 0) self.rest_time_input = QLineEdit("0.25") self.rest_time_input.setFixedWidth(80) self.rest_time_input.setToolTip("Rest period between charge/discharge cycles") params_layout.addWidget(self.rest_time_input, 3, 1) # C-Rate for test params_layout.addWidget(QLabel("Test C-Rate:"), 0, 2) self.c_rate_input = QLineEdit("0.1") self.c_rate_input.setFixedWidth(60) self.c_rate_input.setToolTip("Charge/discharge rate relative to battery capacity (e.g., 0.2 for C/5)") params_layout.addWidget(self.c_rate_input, 0, 3) params_layout.addWidget(QLabel("(e.g., 0.2 for C/5)"), 0, 4) controls_layout.addWidget(params_frame, 1) # Button area button_frame = QWidget() button_layout = QVBoxLayout(button_frame) button_layout.setContentsMargins(0, 0, 0, 0) self.start_button = QPushButton("START TEST") self.start_button.clicked.connect(self.start_test) self.start_button.setStyleSheet(f""" QPushButton {{ background-color: {self.accent_color.name()}; font-weight: bold; padding: 8px; border-radius: 5px; }} QPushButton:disabled {{ background-color: #4C566A; }} """) self.start_button.setEnabled(False) button_layout.addWidget(self.start_button) self.stop_button = QPushButton("STOP TEST") self.stop_button.clicked.connect(self.stop_test) self.stop_button.setStyleSheet(f""" QPushButton {{ background-color: {self.warning_color.name()}; font-weight: bold; padding: 8px; border-radius: 5px; }} QPushButton:disabled {{ background-color: #4C566A; }} """) self.stop_button.setEnabled(False) button_layout.addWidget(self.stop_button) # Continuous mode checkbox self.continuous_check = QCheckBox("Continuous Mode") self.continuous_check.setChecked(True) self.continuous_check.setToolTip("Run multiple charge/discharge cycles until stopped") button_layout.addWidget(self.continuous_check) controls_layout.addWidget(button_frame) self.main_layout.addWidget(controls_frame) # Plot area self._setup_plot() self.main_layout.addWidget(self.plot_widget, 1) # Status bar self.status_bar = QLabel("Ready") self.status_bar.setStyleSheet("font-size: 10px; padding: 5px;") self.main_layout.addWidget(self.status_bar) def _setup_plot(self): """Configure the matplotlib plot.""" self.plot_widget = QWidget() plot_layout = QVBoxLayout(self.plot_widget) self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440') self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) self.ax = self.fig.add_subplot(111) self.ax.set_facecolor('#3B4252') # Initial voltage range voltage_padding = 0.2 min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) # Voltage plot self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) self.ax.set_ylabel("Voltage (V)", color='#00BFFF') self.ax.tick_params(axis='y', labelcolor='#00BFFF') # Current plot (right axis) self.ax2 = self.ax.twinx() current_padding = 0.05 test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) max_current = test_current * 1.5 self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) self.ax2.set_ylabel("Current (A)", color='r') self.ax2.tick_params(axis='y', labelcolor='r') self.ax.set_xlabel('Time (s)', color=self.fg_color.name()) self.ax.set_title('Battery Test (CC)', color=self.fg_color.name()) self.ax.tick_params(axis='x', colors=self.fg_color.name()) self.ax.grid(True, color='#4C566A') # Position legends self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) # Embed plot self.canvas = FigureCanvas(self.fig) plot_layout.addWidget(self.canvas) def safe_init_device(self): """Safe device initialization with error handling.""" try: self.init_device() except Exception as e: self.handle_device_error(str(e)) def init_device(self): """Initialize the ADALM1000 device.""" # Temporarily enable USB debugging os.environ['LIBUSB_DEBUG'] = '3' # Set to 0 in production self.cleanup_device() try: # Delay to avoid "Device busy" errors time.sleep(1.5) self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) if not self.session.devices: raise Exception("No ADALM1000 detected - check connection") self.dev = self.session.devices[0] # Reset channels self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].constant(0) self.session.start(0) self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") self.connection_label.setText("Connected") self.status_bar.setText("Device connected | Ready for measurement") self.session_active = True self.start_button.setEnabled(True) # Start measurement thread self.start_measurement_thread() except Exception as e: raise Exception(f"Device initialization failed: {str(e)}") def cleanup_device(self): """Clean up device resources.""" # Stop measurement thread if self.measurement_thread is not None: try: self.measurement_thread.stop() if not self.measurement_thread.wait(1000): # 1 second timeout print("Warning: Measurement thread didn't stop cleanly") self.measurement_thread = None except Exception as e: print(f"Error stopping measurement thread: {e}") # Clean up session if hasattr(self, 'session'): try: if self.session_active: time.sleep(0.1) self.session.end() self.session_active = False del self.session except Exception as e: print(f"Error cleaning up session: {e}") finally: self.session_active = False # Reset UI status self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") self.connection_label.setText("Disconnected") self.start_button.setEnabled(False) self.stop_button.setEnabled(False) def start_measurement_thread(self): """Start the measurement thread.""" if self.measurement_thread is not None: self.measurement_thread.stop() self.measurement_thread = MeasurementThread(self.dev, self.interval) self.measurement_thread.update_signal.connect(self.update_measurements) self.measurement_thread.error_signal.connect(self.handle_device_error) self.measurement_thread.start() def reconnect_device(self): """Attempt to reconnect the device.""" self.status_bar.setText("Attempting to reconnect...") self.cleanup_device() QTimer.singleShot(2000, self.safe_init_device) # Retry with delay def handle_device_error(self, error_msg): """Thread-safe device error handling.""" try: # Ensure this runs in the main thread if QThread.currentThread() != self.thread(): self.error_signal.emit(str(error_msg)) return print(f"Device error: {error_msg}") def update_ui(): self.status_bar.setText(f"Device error: {error_msg}") if self.isVisible(): QMessageBox.critical( self, "Device Error", f"Device error occurred:\n{error_msg}\n\n" "1. Check USB connection\n" "2. Try manual reconnect\n" "3. Restart application if problems persist" ) QTimer.singleShot(0, lambda: ( self.cleanup_device(), update_ui() )) except Exception as e: print(f"Error in error handler: {str(e)}") try: self.cleanup_device() except: pass def start_test(self): """Start the complete battery test cycle.""" if not self.test_running: try: # Get and validate input values capacity = float(self.capacity_input.text()) charge_cutoff = float(self.charge_cutoff_input.text()) discharge_cutoff = float(self.discharge_cutoff_input.text()) c_rate = float(self.c_rate_input.text()) if capacity <= 0: raise ValueError("Battery capacity must be positive") if charge_cutoff <= discharge_cutoff: raise ValueError("Charge cutoff must be higher than discharge cutoff") if c_rate <= 0: raise ValueError("C-rate must be positive") self.continuous_mode = self.continuous_check.isChecked() test_current = c_rate * capacity if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") # Reset data self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 self.start_time = time.time() self.time_label.setText("00:00:00") self.reset_plot() # Prepare log file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}") # Start test self.test_running = True self.start_time = time.time() self.test_phase = "Initial Discharge" self.phase_label.setText(self.test_phase) self.start_button.setEnabled(False) self.stop_button.setEnabled(True) self.status_bar.setText(f"Test started | Discharging to {discharge_cutoff}V @ {test_current:.3f}A") # Start test sequence in separate thread self.test_thread = TestSequenceThread(self) self.test_thread.progress_updated.connect(self.update_test_progress) self.test_thread.cycle_completed.connect(self.update_cycle_stats) self.test_thread.error_occurred.connect(self.handle_device_error) self.test_thread.finished.connect(self.finalize_test) self.test_thread.start() except Exception as e: QMessageBox.critical(self, "Error", str(e)) def update_test_progress(self, progress: float, phase: str): """Update test progress and phase display.""" self.test_phase = phase self.phase_label.setText(phase) self.progress_bar.setValue(int(progress * 100)) def update_cycle_stats(self, cycle: int, discharge: float, charge: float, efficiency: float): """Update cycle statistics.""" self.cycle_count = cycle self.capacity_ah = discharge self.charge_capacity = charge self.coulomb_efficiency = efficiency self.cycle_label.setText(f"{cycle}") self.capacity_label.setText(f"{discharge:.4f}") self.charge_capacity_label.setText(f"{charge:.4f}") self.efficiency_label.setText(f"{efficiency:.1f}") self.status_bar.setText( f"Cycle {cycle} completed | " f"Discharge: {discharge:.3f}Ah | " f"Charge: {charge:.3f}Ah | " f"Efficiency: {efficiency:.1f}%" ) # Write cycle summary self.write_cycle_summary() def stop_test(self): """Safely stop the running test.""" if not self.test_running: return self.request_stop = True self.test_running = False self.measuring = False self.test_phase = "Ready" self.phase_label.setText(self.test_phase) if hasattr(self, 'dev'): try: self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) except Exception as e: print(f"Error resetting device: {e}") # Update UI self.status_bar.setText("Test stopped - Ready for new test") self.stop_button.setEnabled(False) self.start_button.setEnabled(True) self.finalize_test(show_message=False) def finalize_test(self, show_message: bool = True): """Final cleanup after test completion.""" try: # Write log data if hasattr(self, 'log_buffer') and self.log_buffer: try: with open(self.filename, 'a', newline='') as f: writer = csv.writer(f) writer.writerows(self.log_buffer) self.log_buffer.clear() except Exception as e: print(f"Error writing log buffer: {e}") # Close log file if hasattr(self, 'current_cycle_file') and self.current_cycle_file: try: self.current_cycle_file.flush() os.fsync(self.current_cycle_file.fileno()) self.current_cycle_file.close() except Exception as e: print(f"Error closing log file: {e}") # Show notification if show_message: msg_box = QMessageBox(self) msg_box.setWindowFlags(msg_box.windowFlags() | Qt.WindowStaysOnTopHint) msg_box.setIcon(QMessageBox.Information) msg_box.setWindowTitle("Test Complete") msg_box.setText(f"Test completed\nCycles: {self.cycle_count}") msg_box.exec_() except Exception as e: print(f"Critical error in finalize_test: {e}") finally: # Reset test status self.test_running = False self.request_stop = True self.measuring = False def update_measurements(self, voltage: float, current: float, current_time: float): """Update measurements in the UI.""" if self.time_data.size() > 10000: # Limit data points self.time_data.append self.voltage_data.append self.current_data.append self.time_data.append(current_time) self.voltage_data.append(voltage) self.current_data.append(current) # Update UI self.voltage_label.setText(f"{voltage:.4f}") self.current_label.setText(f"{current:.4f}") self.time_label.setText(self.format_time(current_time)) # Update plot periodically if self.time_data.size() % 10 == 0: self.update_plot() # Log data if test is running if self.test_running and hasattr(self, 'current_cycle_file'): self.log_buffer.append([ f"{current_time:.3f}", f"{voltage:.6f}", f"{current:.6f}", self.test_phase, f"{self.capacity_ah:.4f}", f"{self.charge_capacity:.4f}", f"{self.coulomb_efficiency:.1f}", f"{self.cycle_count}" ]) # Write data in blocks if len(self.log_buffer) >= 10: with open(self.filename, 'a', newline='') as f: writer = csv.writer(f) writer.writerows(self.log_buffer) self.log_buffer.clear() def update_plot(self): """Update the plot with new data.""" if self.time_data.empty(): return self.line_voltage.set_data(list(self.time_data), list(self.voltage_data)) self.line_current.set_data(list(self.time_data), list(self.current_data)) self.auto_scale_axes() self.canvas.draw_idle() def auto_scale_axes(self): """Automatically adjust plot axes.""" if self.time_data.empty(): return # X-axis adjustment max_time = list(self.time_data)[-1] current_xlim = self.ax.get_xlim() if max_time > current_xlim[1] * 0.95: new_xmax = current_xlim[1] + (max_time * 1.05 - current_xlim[1]) * 0.1 self.ax.set_xlim(0, new_xmax) self.ax2.set_xlim(0, new_xmax) # Y-axes adjustment if not self.voltage_data.empty(): voltage_padding = 0.2 min_v = max(0, min(list(self.voltage_data)) - voltage_padding) max_v = min(5.0, max(list(self.voltage_data)) + voltage_padding) current_ylim = self.ax.get_ylim() new_min = current_ylim[0] + (min_v - current_ylim[0]) * 0.1 new_max = current_ylim[1] + (max_v - current_ylim[1]) * 0.1 self.ax.set_ylim(new_min, new_max) if not self.current_data.empty(): current_padding = 0.05 min_c = max(-0.25, min(list(self.current_data)) - current_padding) max_c = min(0.25, max(list(self.current_data)) + current_padding) current_ylim = self.ax2.get_ylim() new_min = current_ylim[0] + (min_c - current_ylim[0]) * 0.1 new_max = current_ylim[1] + (max_c - current_ylim[1]) * 0.1 self.ax2.set_ylim(new_min, new_max) @staticmethod def format_time(seconds: float) -> str: """Format seconds as HH:MM:SS. Args: seconds: Time in seconds Returns: Formatted time string """ hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def reset_plot(self): """Reset the plot to initial state.""" self.line_voltage.set_data([], []) self.line_current.set_data([], []) # Reset axes to starting values self.ax.set_xlim(0, 10) voltage_padding = 0.2 min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding self.ax.set_ylim(min_voltage, max_volume) self.canvas.draw() def closeEvent(self, event): """Clean up when closing the window.""" self.cleanup_device() event.accept() if __name__ == "__main__": app = QApplication([]) app.setStyle('Fusion') window = BatteryTester() window.show() app.exec_()