# -*- coding: utf-8 -*- import os import time import csv import threading from datetime import datetime import numpy as np from collections import deque # Suppress QStandardPaths warning os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false' from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QPushButton, QLineEdit, QFrame, QCheckBox, QMessageBox, QFileDialog) from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread from PyQt5.QtGui import QColor, QPalette import pysmu import matplotlib matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure class DeviceDisconnectedError(Exception): pass class MeasurementThread(QThread): # Changed from QObject to QThread update_signal = pyqtSignal(float, float, float) error_signal = pyqtSignal(str) def __init__(self, device, interval=0.1): super().__init__() self.device = device self.interval = interval self.running = False self.filter_window_size = 10 self.start_time = time.time() def run(self): self.running = True voltage_window = [] current_window = [] while self.running: try: samples = self.device.read(self.filter_window_size, 500, True) if not samples: raise DeviceDisconnectedError("No samples received") raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage raw_current = np.mean([s[0][1] for s in samples]) # Channel A current current_time = time.time() - self.start_time # Apply moving average filter voltage_window.append(raw_voltage) current_window.append(raw_current) if len(voltage_window) > self.filter_window_size: voltage_window.pop(0) current_window.pop(0) voltage = np.mean(voltage_window) current = np.mean(current_window) self.update_signal.emit(voltage, current, current_time) time.sleep(max(0.05, self.interval)) except Exception as e: self.error_signal.emit(str(e)) break def stop(self): self.running = False self.quit() self.wait(500) # Wait up to 500ms for thread to finish class BatteryTester(QMainWindow): def __init__(self): super().__init__() # Initialize all attributes first self.time_data = deque() self.voltage_data = deque() self.current_data = deque() self.phase_data = deque() # Test state variables self.test_phase = "Idle" self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 # Color scheme self.bg_color = QColor(46, 52, 64) self.fg_color = QColor(216, 222, 233) self.accent_color = QColor(94, 129, 172) self.warning_color = QColor(191, 97, 106) self.success_color = QColor(163, 190, 140) # Device and measurement state self.session_active = False self.measuring = False self.test_running = False self.continuous_mode = False self.request_stop = False self.interval = 0.1 self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) # Thread management self.measurement_thread = None self.measurement_worker = None self.test_thread = None # Initialize UI self.setup_ui() # Initialize device after UI is set up QTimer.singleShot(100, self.init_device) def setup_ui(self): """Configure the user interface""" self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)") self.resize(1000, 800) self.setMinimumSize(800, 700) # Set background color palette = self.palette() palette.setColor(QPalette.Window, self.bg_color) palette.setColor(QPalette.WindowText, self.fg_color) palette.setColor(QPalette.Base, QColor(59, 66, 82)) palette.setColor(QPalette.Text, self.fg_color) palette.setColor(QPalette.Button, self.accent_color) palette.setColor(QPalette.ButtonText, self.fg_color) self.setPalette(palette) # Main widget and layout self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.main_layout = QVBoxLayout(self.central_widget) self.main_layout.setContentsMargins(10, 10, 10, 10) self.main_layout.setSpacing(10) # Header area header_frame = QWidget() header_layout = QHBoxLayout(header_frame) header_layout.setContentsMargins(0, 0, 0, 0) self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)") self.title_label.setStyleSheet(f"font-size: 16px; font-weight: bold; color: {self.accent_color.name()};") header_layout.addWidget(self.title_label, 1) # Status indicator self.connection_label = QLabel("Disconnected") header_layout.addWidget(self.connection_label) self.status_light = QLabel() self.status_light.setFixedSize(20, 20) self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") header_layout.addWidget(self.status_light) # Reconnect button self.reconnect_btn = QPushButton("Reconnect") self.reconnect_btn.clicked.connect(self.reconnect_device) header_layout.addWidget(self.reconnect_btn) self.main_layout.addWidget(header_frame) # Measurement display display_frame = QFrame() display_frame.setFrameShape(QFrame.StyledPanel) display_layout = QGridLayout(display_frame) measurement_labels = [ ("Voltage (V)", "V"), ("Current (A)", "A"), ("Test Phase", ""), ("Elapsed Time", "s"), ("Discharge Capacity", "Ah"), ("Charge Capacity", "Ah"), ("Coulomb Eff.", "%"), ("Cycle Count", ""), ] for i, (label, unit) in enumerate(measurement_labels): row = i // 2 col = (i % 2) * 2 lbl = QLabel(f"{label}:") lbl.setStyleSheet("font-size: 11px;") display_layout.addWidget(lbl, row, col) value_label = QLabel("0.000") value_label.setStyleSheet("font-size: 12px; font-weight: bold;") display_layout.addWidget(value_label, row, col + 1) if unit: unit_label = QLabel(unit) display_layout.addWidget(unit_label, row, col + 2) if i == 0: self.voltage_label = value_label elif i == 1: self.current_label = value_label elif i == 2: self.phase_label = value_label self.phase_label.setText(self.test_phase) # Now safe to access test_phase elif i == 3: self.time_label = value_label elif i == 4: self.capacity_label = value_label elif i == 5: self.charge_capacity_label = value_label elif i == 6: self.efficiency_label = value_label elif i == 7: self.cycle_label = value_label self.main_layout.addWidget(display_frame) # Control area controls_frame = QWidget() controls_layout = QHBoxLayout(controls_frame) controls_layout.setContentsMargins(0, 0, 0, 0) # Parameters frame params_frame = QFrame() params_frame.setFrameShape(QFrame.StyledPanel) params_layout = QGridLayout(params_frame) # Battery capacity params_layout.addWidget(QLabel("Battery Capacity (Ah):"), 0, 0) self.capacity_input = QLineEdit("0.2") self.capacity_input.setFixedWidth(80) params_layout.addWidget(self.capacity_input, 0, 1) # Charge cutoff params_layout.addWidget(QLabel("Charge Cutoff (V):"), 1, 0) self.charge_cutoff_input = QLineEdit("1.43") self.charge_cutoff_input.setFixedWidth(80) params_layout.addWidget(self.charge_cutoff_input, 1, 1) # Discharge cutoff params_layout.addWidget(QLabel("Discharge Cutoff (V):"), 2, 0) self.discharge_cutoff_input = QLineEdit("0.9") self.discharge_cutoff_input.setFixedWidth(80) params_layout.addWidget(self.discharge_cutoff_input, 2, 1) # Rest time params_layout.addWidget(QLabel("Rest Time (hours):"), 3, 0) self.rest_time_input = QLineEdit("0.25") self.rest_time_input.setFixedWidth(80) params_layout.addWidget(self.rest_time_input, 3, 1) # C-rate for test params_layout.addWidget(QLabel("Test C-rate:"), 0, 2) self.c_rate_input = QLineEdit("0.1") self.c_rate_input.setFixedWidth(60) params_layout.addWidget(self.c_rate_input, 0, 3) params_layout.addWidget(QLabel("(e.g., 0.2 for C/5)"), 0, 4) controls_layout.addWidget(params_frame, 1) # Button frame button_frame = QWidget() button_layout = QVBoxLayout(button_frame) button_layout.setContentsMargins(0, 0, 0, 0) self.start_button = QPushButton("START TEST") self.start_button.clicked.connect(self.start_test) self.start_button.setStyleSheet(f"background-color: {self.accent_color.name()}; font-weight: bold;") self.start_button.setEnabled(False) # Disabled until device is connected button_layout.addWidget(self.start_button) self.stop_button = QPushButton("STOP TEST") self.stop_button.clicked.connect(self.stop_test) self.stop_button.setStyleSheet(f"background-color: {self.warning_color.name()}; font-weight: bold;") self.stop_button.setEnabled(False) button_layout.addWidget(self.stop_button) # Continuous mode checkbox self.continuous_check = QCheckBox("Continuous Mode") self.continuous_check.setChecked(True) button_layout.addWidget(self.continuous_check) controls_layout.addWidget(button_frame) self.main_layout.addWidget(controls_frame) # Plot area self.setup_plot() self.main_layout.addWidget(self.plot_widget, 1) # Status bar self.status_bar = QLabel("Ready") self.status_bar.setStyleSheet("font-size: 10px; padding: 5px;") self.main_layout.addWidget(self.status_bar) # Initialize test phase display self.phase_label.setText(self.test_phase) def setup_plot(self): """Configure the matplotlib plot""" self.plot_widget = QWidget() plot_layout = QVBoxLayout(self.plot_widget) self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440') self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) self.ax = self.fig.add_subplot(111) self.ax.set_facecolor('#3B4252') # Set initial voltage range voltage_padding = 0.2 min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) # Voltage plot self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) self.ax.set_ylabel("Voltage (V)", color='#00BFFF') self.ax.tick_params(axis='y', labelcolor='#00BFFF') # Current plot (right axis) self.ax2 = self.ax.twinx() current_padding = 0.05 test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) max_current = test_current * 1.5 self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) self.ax2.set_ylabel("Current (A)", color='r') self.ax2.tick_params(axis='y', labelcolor='r') self.ax.set_xlabel('Time (s)', color=self.fg_color.name()) self.ax.set_title('Battery Test (CC)', color=self.fg_color.name()) self.ax.tick_params(axis='x', colors=self.fg_color.name()) self.ax.grid(True, color='#4C566A') # Position legends self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) # Embed plot self.canvas = FigureCanvas(self.fig) plot_layout.addWidget(self.canvas) def init_device(self): """Initialize the ADALM1000 device with continuous measurement""" try: # Clean up any existing session if hasattr(self, 'session'): try: self.session.end() del self.session except: pass time.sleep(1) self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) if not self.session.devices: raise Exception("No ADALM1000 detected - check connections") self.dev = self.session.devices[0] # Reset channels self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].constant(0) self.session.start(0) self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;") self.connection_label.setText("Connected") self.status_bar.setText("Device connected | Ready to measure") self.session_active = True self.start_button.setEnabled(True) # Start measurement thread self.start_measurement_thread() except Exception as e: self.handle_device_error(e) def start_measurement_thread(self): """Start the continuous measurement thread""" if hasattr(self, 'measurement_thread') and self.measurement_thread.isRunning(): self.measurement_thread.quit() self.measurement_thread.wait() self.measurement_thread = QThread() self.measurement_worker = MeasurementThread(self.dev, self.interval) self.measurement_worker.moveToThread(self.measurement_thread) self.measurement_thread.started.connect(self.measurement_worker.run) self.measurement_worker.update_signal.connect(self.update_measurements) self.measurement_worker.error_signal.connect(self.handle_device_error) self.measurement_thread.start() @pyqtSlot(float, float, float) def update_measurements(self, voltage, current, current_time): """Update measurements from the measurement thread""" self.time_data.append(current_time) self.voltage_data.append(voltage) self.current_data.append(current) # Update display self.voltage_label.setText(f"{voltage:.4f}") self.current_label.setText(f"{current:.4f}") self.time_label.setText(self.format_time(current_time)) # Update plot periodically if len(self.time_data) % 10 == 0: self.update_plot() # Log data if test is running if self.test_running and hasattr(self, 'current_cycle_file'): self.log_buffer.append([ f"{current_time:.3f}", f"{voltage:.6f}", f"{current:.6f}", self.test_phase, f"{self.capacity_ah:.4f}", f"{self.charge_capacity:.4f}", f"{self.coulomb_efficiency:.1f}", f"{self.cycle_count}" ]) # Write in chunks of 10 samples if len(self.log_buffer) >= 10: with open(self.filename, 'a', newline='') as f: writer = csv.writer(f) writer.writerows(self.log_buffer) self.log_buffer.clear() def start_test(self): """Start the full battery test cycle""" if not self.test_running: try: # Get values from inputs capacity = float(self.capacity_input.text()) charge_cutoff = float(self.charge_cutoff_input.text()) discharge_cutoff = float(self.discharge_cutoff_input.text()) c_rate = float(self.c_rate_input.text()) # Validate inputs if capacity <= 0: raise ValueError("Battery capacity must be positive") if charge_cutoff <= discharge_cutoff: raise ValueError("Charge cutoff must be higher than discharge cutoff") if c_rate <= 0: raise ValueError("C-rate must be positive") self.continuous_mode = self.continuous_check.isChecked() # Reset timing for new test self.measurement_start_time = time.time() self.test_start_time = time.time() # Calculate target current test_current = c_rate * capacity if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") # Clear previous data self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.phase_data.clear() self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 # Reset plot self.reset_plot() # Generate base filename without cycle number timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}") self.current_cycle_file = None # Start test self.test_running = True self.start_time = time.time() self.last_update_time = time.time() self.test_phase = "Initial Discharge" self.phase_label.setText(self.test_phase) self.start_button.setEnabled(False) self.stop_button.setEnabled(True) self.status_bar.setText(f"Test started | Discharging to {discharge_cutoff}V @ {test_current:.3f}A") # Start test sequence in a new thread self.test_thread = threading.Thread(target=self.run_test_sequence, daemon=True) self.test_thread.start() except Exception as e: QMessageBox.critical(self, "Error", str(e)) def create_cycle_log_file(self): """Create a new log file for the current cycle""" # Close previous file if exists if hasattr(self, 'current_cycle_file') and self.current_cycle_file: try: self.current_cycle_file.close() except Exception as e: print(f"Error closing previous log file: {e}") # Check write permissions if not os.access(self.log_dir, os.W_OK): QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}") return False # Create new log file with sequential suffix suffix = 1 while True: self.filename = f"{self.base_filename}_{suffix}.csv" if not os.path.exists(self.filename): break suffix += 1 try: self.current_cycle_file = open(self.filename, 'w', newline='') self.log_writer = csv.writer(self.current_cycle_file) self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase", "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", "Coulomb_Eff(%)", "Cycle"]) self.log_buffer = [] return True except Exception as e: QMessageBox.critical(self, "Error", f"Failed to create log file: {e}") return False @staticmethod def format_time(seconds): """Convert seconds to hh:mm:ss format""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def stop_test(self): """Request immediate stop of the test and clean up all test data""" if not self.test_running: return self.request_stop = True self.test_running = False self.measuring = False self.test_phase = "Idle" self.phase_label.setText(self.test_phase) # Immediately set device to safe state if hasattr(self, 'dev'): try: self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) except Exception as e: print(f"Error resetting device: {e}") # Clear all data buffers self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.phase_data.clear() # Reset test values self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 # Reset plot self.reset_plot() # Update UI self.status_bar.setText("Test stopped - Ready for new test") self.stop_button.setEnabled(False) self.start_button.setEnabled(True) # Finalize test data (logs, etc.) QTimer.singleShot(100, self.finalize_test) def run_test_sequence(self): try: test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) charge_cutoff = float(self.charge_cutoff_input.text()) discharge_cutoff = float(self.discharge_cutoff_input.text()) while self.test_running and (self.continuous_mode or self.cycle_count == 0): # Reset stop request at start of each cycle self.request_stop = False self.cycle_count += 1 self.cycle_label.setText(f"{self.cycle_count}") # Create new log file for this cycle self.create_cycle_log_file() # 1. Charge phase (constant current) self.test_phase = "Charge" self.phase_label.setText(self.test_phase) self.status_bar.setText(f"Charging to {charge_cutoff}V @ {test_current:.3f}A") self.measuring = True self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].mode = pysmu.Mode.SIMV self.dev.channels['A'].constant(test_current) self.charge_capacity = 0.0 self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}") target_voltage = charge_cutoff self.last_update_time = time.time() while self.test_running and not self.request_stop: if not self.voltage_data: time.sleep(0.1) continue current_voltage = self.voltage_data[-1] measured_current = abs(self.current_data[-1]) # Update charge capacity now = time.time() delta_t = now - self.last_update_time self.last_update_time = now self.charge_capacity += measured_current * delta_t / 3600 self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}") self.status_bar.setText( f"Charging: {current_voltage:.3f}V / {target_voltage}V | " f"Current: {measured_current:.3f}A | " f"Capacity: {self.charge_capacity:.4f}Ah" ) if current_voltage >= target_voltage or self.request_stop: break time.sleep(0.1) # More frequent checks if self.request_stop or not self.test_running: break # 2. Rest period after charge self.test_phase = "Resting (Post-Charge)" self.phase_label.setText(self.test_phase) self.measuring = False self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) rest_end_time = time.time() + (float(self.rest_time_input.text()) * 3600) while time.time() < rest_end_time and self.test_running and not self.request_stop: time_left = max(0, rest_end_time - time.time()) self.status_bar.setText( f"Resting after charge | " f"Time left: {time_left/60:.1f} min" ) time.sleep(1) # Check every second for stop request if self.request_stop or not self.test_running: break # 3. Discharge phase (capacity measurement) self.test_phase = "Discharge" self.phase_label.setText(self.test_phase) self.status_bar.setText(f"Discharging to {discharge_cutoff}V @ {test_current:.3f}A") self.measuring = True self.dev.channels['A'].mode = pysmu.Mode.SIMV self.dev.channels['A'].constant(-test_current) self.capacity_ah = 0.0 self.capacity_label.setText(f"{self.capacity_ah:.4f}") self.last_update_time = time.time() while self.test_running and not self.request_stop: if not self.current_data: time.sleep(0.1) continue current_voltage = self.voltage_data[-1] current_current = abs(self.current_data[-1]) # Capacity calculation now = time.time() delta_t = now - self.last_update_time self.last_update_time = now self.capacity_ah += current_current * delta_t / 3600 self.capacity_label.setText(f"{self.capacity_ah:.4f}") if not self.continuous_check.isChecked() and self.continuous_mode: self.continuous_mode = False self.status_bar.setText( f"Continuous Mode disabled | " f"Discharging to {discharge_cutoff}V (will stop after this cycle) | " f"Current: {current_current:.3f}A | " f"Capacity: {self.capacity_ah:.4f}Ah" ) else: # Default status message self.status_bar.setText( f"Discharging: {current_voltage:.3f}V / {discharge_cutoff}V | " f"Current: {current_current:.3f}A | " f"Capacity: {self.capacity_ah:.4f}Ah" ) if current_voltage <= discharge_cutoff or self.request_stop: break if not self.continuous_check.isChecked(): self.test_running = False self.test_phase = "Idle" self.phase_label.setText(self.test_phase) break # Exit the main test loop # 4. Rest period after discharge (only if not stopping) if self.test_running and not self.request_stop: self.test_phase = "Resting (Post-Discharge)" self.phase_label.setText(self.test_phase) self.measuring = False self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) rest_end_time = time.time() + (float(self.rest_time_input.text()) * 3600) while time.time() < rest_end_time and self.test_running and not self.request_stop: time_left = max(0, rest_end_time - time.time()) self.status_bar.setText( f"Resting after discharge | " f"Time left: {time_left/60:.1f} min" ) time.sleep(1) # Calculate Coulomb efficiency if not stopping if not self.request_stop and self.charge_capacity > 0: efficiency = (self.capacity_ah / self.charge_capacity) * 100 self.coulomb_efficiency = efficiency self.efficiency_label.setText(f"{self.coulomb_efficiency:.1f}") # Update cycle info self.status_bar.setText( f"Cycle {self.cycle_count} complete | " f"Discharge: {self.capacity_ah:.3f}Ah | " f"Charge: {self.charge_capacity:.3f}Ah | " f"Efficiency: {self.coulomb_efficiency:.1f}%" ) # Write cycle summary to log file self.write_cycle_summary() # Flush remaining buffer data if hasattr(self, 'log_buffer') and self.log_buffer: with open(self.filename, 'a', newline='') as f: writer = csv.writer(f) writer.writerows(self.log_buffer) self.log_buffer.clear() # Finalize test if stopped or completed self.finalize_test() except Exception as e: error_msg = str(e) QTimer.singleShot(0, lambda: QMessageBox.critical(self, "Test Error", error_msg)) self.finalize_test() def finalize_test(self): """Final cleanup after test completes or is stopped""" self.measuring = False if hasattr(self, 'dev'): try: self.dev.channels['A'].constant(0) except Exception as e: print(f"Error resetting device: {e}") # Flush and close current log file if hasattr(self, 'log_buffer') and self.log_buffer and hasattr(self, 'log_writer'): try: self.log_writer.writerows(self.log_buffer) self.log_buffer.clear() except Exception as e: print(f"Error flushing log buffer: {e}") if hasattr(self, 'current_cycle_file'): try: self.current_cycle_file.close() except Exception as e: print(f"Error closing log file: {e}") self.start_button.setEnabled(True) self.stop_button.setEnabled(False) self.request_stop = False message = ( f"Test safely stopped after discharge phase | " f"Cycle {self.cycle_count} completed | " f"Final capacity: {self.capacity_ah:.3f}Ah" ) self.status_bar.setText(message) QMessageBox.information( self, "Test Completed", f"Test was safely stopped after discharge phase.\n\n" f"Final discharge capacity: {self.capacity_ah:.3f}Ah\n" f"Total cycles completed: {self.cycle_count}" ) def reset_plot(self): """Reset the plot completely for a new test""" # Clear the data lines self.line_voltage.set_data([], []) self.line_current.set_data([], []) # Reset the data buffers self.time_data.clear() self.voltage_data.clear() self.current_data.clear() # Set reasonable initial axis ranges voltage_padding = 0.2 min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding self.ax.set_xlim(0, 10) # 10s initial range self.ax.set_ylim(min_voltage, max_velocity) current_padding = 0.05 test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) max_current = test_current * 1.5 # 50% padding self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) # Force redraw self.canvas.draw() def write_cycle_summary(self): """Write cycle summary to the current cycle's log file""" if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file: return summary_line = ( f"Cycle {self.cycle_count} Summary - " f"Discharge={self.capacity_ah:.4f}Ah, " f"Charge={self.charge_capacity:.4f}Ah, " f"Efficiency={self.coulomb_efficiency:.1f}%" ) # Ensure file is open and write summary try: if hasattr(self, 'log_buffer') and self.log_buffer: self.log_writer.writerows(self.log_buffer) self.log_buffer.clear() self.current_cycle_file.write(summary_line + "\n") self.current_cycle_file.flush() except Exception as e: print(f"Error writing cycle summary: {e}") def update_plot(self): """Optimized plot update with change detection""" if not self.time_data: return # Force update more frequently at start of test if len(self.time_data) < 10 or (time.time() - getattr(self, '_last_plot_time', 0)) > 1.0: self.line_voltage.set_data(self.time_data, self.voltage_data) self.line_current.set_data(self.time_data, self.current_data) self.auto_scale_axes() self.canvas.draw_idle() self._last_plot_time = time.time() def auto_scale_axes(self): """Auto-scale plot axes with appropriate padding and strict boundaries""" if not self.time_data: return # X-axis scaling with 5% padding but don't exceed current max time min_time = 0 max_time = self.time_data[-1] current_xlim = self.ax.get_xlim() # Only expand axis if new data exceeds current view if max_time > current_xlim[1] * 0.95: # 95% threshold to start expanding new_max = max_time * 1.05 # 5% padding self.ax.set_xlim(min_time, new_max) self.ax2.set_xlim(min_time, new_max) # Voltage axis scaling with strict boundaries voltage_padding = 0.2 if self.voltage_data: min_voltage = max(0, min(self.voltage_data) - voltage_padding) max_voltage = min(5.0, max(self.voltage_data) + voltage_padding) # 5V hardware limit current_ylim = self.ax.get_ylim() if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1): self.ax.set_ylim(min_voltage, max_voltage) # Current axis scaling with strict boundaries current_padding = 0.05 if self.current_data: min_current = max(-0.25, min(self.current_data) - current_padding) # -250mA limit max_current = min(0.25, max(self.current_data) + current_padding) # +250mA limit current_ylim2 = self.ax2.get_ylim() if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02): self.ax2.set_ylim(min_current, max_current) def handle_device_error(self, error_msg): """Handle device connection errors""" error_msg = str(error_msg) print(f"Device error: {error_msg}") # Clean up session first if hasattr(self, 'session'): try: if self.session_active: self.session.end() del self.session except Exception as e: print(f"Error cleaning up session: {e}") # Update UI self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") self.connection_label.setText("Disconnected") self.status_bar.setText(f"Device error: {error_msg}") self.session_active = False self.test_running = False self.continuous_mode = False self.measuring = False self.start_button.setEnabled(False) self.stop_button.setEnabled(False) # Clear plot + buffers self.time_data.clear() self.voltage_data.clear() self.current_data.clear() if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'): self.line_voltage.set_data([], []) self.line_current.set_data([], []) self.ax.set_xlim(0, 1) self.ax2.set_xlim(0, 1) self.canvas.draw() # Show error message and attempt reconnect automatically QTimer.singleShot(100, self.attempt_reconnect) def attempt_reconnect(self): """Attempt to reconnect automatically""" try: # Show error message first QMessageBox.critical( self, "Device Connection Error", "Could not connect to ADALM1000\n\n" "1. Check USB cable connection\n" "2. The device will attempt to reconnect automatically" ) except Exception as e: print(f"Error showing message: {e}") return # Schedule reconnect attempt QTimer.singleShot(1000, self.reconnect_device) def reconnect_device(self): """Reconnect the device with proper cleanup""" self.status_bar.setText("Attempting to reconnect...") # Clean up measurement thread if it exists if self.measurement_thread is not None: try: if hasattr(self.measurement_thread, 'isRunning') and self.measurement_thread.isRunning(): self.measurement_thread.stop() self.measurement_thread.quit() self.measurement_thread.wait(500) except Exception as e: print(f"Error stopping measurement thread: {e}") finally: self.measurement_thread = None # Clean up any existing session if hasattr(self, 'session'): try: if self.session_active: self.session.end() del self.session except Exception as e: print(f"Error cleaning up session: {e}") # Reset device state self.session_active = False self.test_running = False self.continuous_mode = False self.measuring = False # Add small delay to allow device to reset time.sleep(1.5) # Try to initialize device try: self.init_device() if self.session_active: self.status_bar.setText("Reconnected successfully") return except Exception as e: print(f"Reconnect failed: {e}") # If we get here, reconnection failed self.status_bar.setText("Reconnect failed - will retry...") QTimer.singleShot(2000, self.reconnect_device) # Retry after 2 seconds def closeEvent(self, event): """Clean up on window close""" # Set flags to stop all threads self.test_running = False self.measuring = False self.session_active = False if hasattr(self, 'measurement_thread'): self.measurement_thread.quit() self.measurement_thread.wait() # Clean up device session if hasattr(self, 'session') and self.session: try: self.session.end() except Exception as e: print(f"Error ending session: {e}") event.accept() if __name__ == "__main__": app = QApplication([]) # Set application style app.setStyle('Fusion') # Create and show main window window = BatteryTester() window.show() # Run application app.exec_()