# -*- coding: utf-8 -*- import os import time import csv import threading from datetime import datetime import numpy as np import matplotlib matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure from collections import deque from queue import Queue, Full, Empty from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog) from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread from PyQt5 import sip import pysmu class DeviceDisconnectedError(Exception): pass class MeasurementThread(QThread): update_signal = pyqtSignal(float, float, float) error_signal = pyqtSignal(str) def __init__(self, device, interval=0.1): super().__init__() self.device = device self.interval = interval self._running = False self.filter_window_size = 10 self.voltage_window = [] self.current_window = [] self.start_time = time.time() self.measurement_queue = Queue(maxsize=1) def run(self): """Continuous measurement loop""" self._running = True while self._running: try: samples = self.device.read(self.filter_window_size, 500, True) if not samples: raise DeviceDisconnectedError("No samples received") current_time = time.time() - self.start_time # Get voltage from Channel B (HI_Z mode) and current from Channel A raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage raw_current = np.mean([s[0][1] for s in samples]) # Channel A current # Apply sign correction based on test phase if available if hasattr(self, 'parent') and hasattr(self.parent, 'test_phase'): if self.parent.test_phase == "Discharge": raw_current = -abs(raw_current) elif self.parent.test_phase == "Charge": raw_current = abs(raw_current) # Update filter windows self.voltage_window.append(raw_voltage) self.current_window.append(raw_current) if len(self.voltage_window) > self.filter_window_size: self.voltage_window.pop(0) self.current_window.pop(0) voltage = np.mean(self.voltage_window) current = np.mean(self.current_window) # Validate measurements if not (0 <= voltage <= 5.0): raise ValueError(f"Invalid voltage: {voltage}V") if not (-0.25 <= current <= 0.25): raise ValueError(f"Invalid current: {current}A") # Emit update self.update_signal.emit(voltage, current, current_time) # Store measurement try: self.measurement_queue.put_nowait((voltage, current)) except Full: pass time.sleep(max(0.05, self.interval)) except Exception as e: self.error_signal.emit(f"Read error: {str(e)}") time.sleep(1) continue def stop(self): self._running = False self.wait(500) class TestSequenceWorker(QObject): finished = pyqtSignal() update_phase = pyqtSignal(str) update_status = pyqtSignal(str) test_completed = pyqtSignal() error_occurred = pyqtSignal(str) def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent): super().__init__() self.device = device self.test_current = test_current self.charge_cutoff = charge_cutoff self.discharge_cutoff = discharge_cutoff self.rest_time = rest_time * 3600 # Convert hours to seconds self.continuous_mode = continuous_mode self.parent = parent self._running = True self.voltage_timeout = 0.5 # seconds def get_latest_measurement(self): """Thread-safe measurement reading with timeout""" try: return self.parent.measurement_thread.measurement_queue.get( timeout=self.voltage_timeout ) except Empty: return (None, None) # Return tuple for unpacking def charge_phase(self): """Handle the battery charging phase""" self.update_phase.emit("Charge") self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.3f}A") try: # Configure channels - Channel A sources current, Channel B measures voltage self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV self.device.channels['A'].constant(self.test_current) # Small delay to allow current to stabilize time.sleep(0.1) while self._running: voltage, current = self.get_latest_measurement() if voltage is None: continue # Update parent's data for logging/display with self.parent.plot_mutex: if len(self.parent.voltage_data) > 0: self.parent.voltage_data[-1] = voltage self.parent.current_data[-1] = current if voltage >= self.charge_cutoff: break time.sleep(0.1) finally: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) def discharge_phase(self): """Handle the battery discharging phase""" voltage, _ = self.get_latest_measurement() if voltage is not None and voltage <= self.discharge_cutoff: self.update_status.emit(f"Already below discharge cutoff ({voltage:.3f}V ≤ {self.discharge_cutoff}V)") return self.update_phase.emit("Discharge") self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.3f}A") try: # Configure channels - Channel A sinks current, Channel B measures voltage self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV self.device.channels['A'].constant(-self.test_current) # Small delay to allow current to stabilize time.sleep(0.1) while self._running: voltage, current = self.get_latest_measurement() if voltage is None: continue # Update parent's data for logging/display with self.parent.plot_mutex: if len(self.parent.voltage_data) > 0: self.parent.voltage_data[-1] = voltage self.parent.current_data[-1] = current if voltage <= self.discharge_cutoff: break time.sleep(0.1) finally: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) def rest_phase(self, phase_name): """Handle rest period between phases""" self.update_phase.emit(f"Resting ({phase_name})") rest_end = time.time() + self.rest_time while time.time() < rest_end and self._running: time_left = max(0, rest_end - time.time()) self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min") time.sleep(1) def stop(self): """Request the thread to stop""" self._running = False try: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) self.device.channels['B'].mode = pysmu.Mode.HI_Z except Exception as e: print(f"Error stopping device: {e}") def run(self): """Main test sequence loop""" try: first_cycle = True # Ensure at least one cycle runs # Modified while condition to also check parent's continuous_mode state while (self._running and (self.parent.continuous_mode_check.isChecked() or first_cycle)): self.parent.request_stop = False self.parent.cycle_count += 1 first_cycle = False # Only True for the first cycle # Existing test phases... # 1. Charge phase (constant current) self.charge_phase() if not self._running or self.parent.request_stop: break # 2. Rest period after charge self.rest_phase("Post-Charge") if not self._running or self.parent.request_stop: break # 3. Discharge phase (capacity measurement) self.discharge_phase() if not self._running or self.parent.request_stop: break # 4. Rest period after discharge (only if not stopping) if self._running and not self.parent.request_stop: self.rest_phase("Post-Discharge") # Calculate Coulomb efficiency if not stopping if not self.parent.request_stop and self.parent.charge_capacity > 0: self.parent.coulomb_efficiency = ( self.parent.capacity_ah / self.parent.charge_capacity ) * 100 # Test completed self.test_completed.emit() except Exception as e: self.error_occurred.emit(f"Test sequence error: {str(e)}") finally: self.finished.emit() class BatteryTester(QMainWindow): def __init__(self): self.plot_mutex = threading.Lock() super().__init__() self.last_logged_phase = None # Color scheme self.bg_color = "#2E3440" self.fg_color = "#D8DEE9" self.accent_color = "#5E81AC" self.warning_color = "#BF616A" self.success_color = "#A3BE8C" # Device and measurement state self.session_active = False self.measuring = False self.test_running = False self.continuous_mode = False self.request_stop = False self.interval = 0.1 self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) # Data buffers self.time_data = deque() self.voltage_data = deque() self.current_data = deque() self.phase_data = deque() # Initialize UI and device self.setup_ui() self.init_device() # Set window properties self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)") self.resize(1000, 800) self.setMinimumSize(800, 700) # Status update timer self.status_timer = QTimer() self.status_timer.timeout.connect(self.update_status) self.status_timer.start(1000) # Update every second def setup_ui(self): """Configure the user interface""" # Main widget and layout self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.main_layout = QVBoxLayout(self.central_widget) self.main_layout.setContentsMargins(10, 10, 10, 10) # Header area header_frame = QFrame() header_frame.setFrameShape(QFrame.NoFrame) header_layout = QHBoxLayout(header_frame) header_layout.setContentsMargins(0, 0, 0, 0) self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)") self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};") header_layout.addWidget(self.title_label, 1) # Status indicator self.status_light = QLabel() self.status_light.setFixedSize(20, 20) self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") header_layout.addWidget(self.status_light) self.connection_label = QLabel("Disconnected") header_layout.addWidget(self.connection_label) # Reconnect button self.reconnect_btn = QPushButton("Reconnect") self.reconnect_btn.clicked.connect(self.reconnect_device) header_layout.addWidget(self.reconnect_btn) self.main_layout.addWidget(header_frame) # Measurement display display_frame = QFrame() display_frame.setFrameShape(QFrame.StyledPanel) display_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") display_layout = QGridLayout(display_frame) # Measurement values measurement_labels = [ ("Voltage", "V"), ("Current", "A"), ("Test Phase", ""), ("Elapsed Time", "s"), ("Discharge Capacity", "Ah"), ("Charge Capacity", "Ah"), ("Coulomb Eff.", "%"), ("Cycle Count", ""), ("Battery Temp", "°C"), ("Internal R", "Ω"), ("Power", "W"), ("Energy", "Wh") ] # 4 Zeilen × 3 Spalten Anordnung for i, (label, unit) in enumerate(measurement_labels): row = i // 3 # 0-3 (4 Zeilen) col = (i % 3) * 3 # 0, 3, 6 (3 Spalten mit je 3 Widgets) # Label für den Messwertnamen lbl = QLabel(f"{label}:") lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;") display_layout.addWidget(lbl, row, col) # Label für den Messwert value_lbl = QLabel("0.000") value_lbl.setStyleSheet(f""" color: {self.fg_color}; font-weight: bold; font-size: 12px; min-width: 60px; """) display_layout.addWidget(value_lbl, row, col + 1) # Einheit falls vorhanden if unit: unit_lbl = QLabel(unit) unit_lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;") display_layout.addWidget(unit_lbl, row, col + 2) # Spaltenabstände anpassen for i in range(9): # 3 Spalten × 3 Widgets display_layout.setColumnStretch(i, 1 if i % 3 == 1 else 0) # Nur Wert-Spalten dehnen # Referenzen aktualisieren self.voltage_label = display_layout.itemAtPosition(0, 1).widget() self.current_label = display_layout.itemAtPosition(0, 4).widget() self.phase_label = display_layout.itemAtPosition(0, 7).widget() self.time_label = display_layout.itemAtPosition(1, 1).widget() self.capacity_label = display_layout.itemAtPosition(1, 4).widget() self.charge_capacity_label = display_layout.itemAtPosition(1, 7).widget() self.efficiency_label = display_layout.itemAtPosition(2, 1).widget() self.cycle_label = display_layout.itemAtPosition(2, 4).widget() self.temp_label = display_layout.itemAtPosition(2, 7).widget() self.resistance_label = display_layout.itemAtPosition(3, 1).widget() self.power_label = display_layout.itemAtPosition(3, 4).widget() self.energy_label = display_layout.itemAtPosition(3, 7).widget() self.main_layout.addWidget(display_frame) # Control area controls_frame = QFrame() controls_frame.setFrameShape(QFrame.NoFrame) controls_layout = QHBoxLayout(controls_frame) controls_layout.setContentsMargins(0, 0, 0, 0) # Parameters frame params_frame = QFrame() params_frame.setFrameShape(QFrame.StyledPanel) params_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") params_layout = QGridLayout(params_frame) # Battery capacity self.capacity = 0.2 self.capacity_label_input = QLabel("Battery Capacity (Ah):") self.capacity_label_input.setStyleSheet(f"color: {self.fg_color};") params_layout.addWidget(self.capacity_label_input, 0, 0) self.capacity_input = QLineEdit("0.2") self.capacity_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.capacity_input.setFixedWidth(60) params_layout.addWidget(self.capacity_input, 0, 1) # Charge cutoff self.charge_cutoff = 1.43 self.charge_cutoff_label = QLabel("Charge Cutoff (V):") self.charge_cutoff_label.setStyleSheet(f"color: {self.fg_color};") params_layout.addWidget(self.charge_cutoff_label, 1, 0) self.charge_cutoff_input = QLineEdit("1.43") self.charge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.charge_cutoff_input.setFixedWidth(60) params_layout.addWidget(self.charge_cutoff_input, 1, 1) # Discharge cutoff self.discharge_cutoff = 0.9 self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):") self.discharge_cutoff_label.setStyleSheet(f"color: {self.fg_color};") params_layout.addWidget(self.discharge_cutoff_label, 2, 0) self.discharge_cutoff_input = QLineEdit("0.9") self.discharge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.discharge_cutoff_input.setFixedWidth(60) params_layout.addWidget(self.discharge_cutoff_input, 2, 1) # Rest time self.rest_time = 0.25 self.rest_time_label = QLabel("Rest Time (hours):") self.rest_time_label.setStyleSheet(f"color: {self.fg_color};") params_layout.addWidget(self.rest_time_label, 3, 0) self.rest_time_input = QLineEdit("0.25") self.rest_time_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.rest_time_input.setFixedWidth(60) params_layout.addWidget(self.rest_time_input, 3, 1) # C-rate for test self.c_rate = 0.1 self.c_rate_label = QLabel("Test C-rate:") self.c_rate_label.setStyleSheet(f"color: {self.fg_color};") params_layout.addWidget(self.c_rate_label, 0, 2) self.c_rate_input = QLineEdit("0.1") self.c_rate_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.c_rate_input.setFixedWidth(40) params_layout.addWidget(self.c_rate_input, 0, 3) c_rate_note = QLabel("(e.g., 0.2 for C/5)") c_rate_note.setStyleSheet(f"color: {self.fg_color};") params_layout.addWidget(c_rate_note, 0, 4) controls_layout.addWidget(params_frame, 1) # Test conditions input self.test_conditions_label = QLabel("Test Conditions/Chemistry:") self.test_conditions_label.setStyleSheet(f"color: {self.fg_color};") params_layout.addWidget(self.test_conditions_label, 4, 0) self.test_conditions_input = QLineEdit("") self.test_conditions_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.test_conditions_input.setFixedWidth(120) params_layout.addWidget(self.test_conditions_input, 4, 1) # Button frame button_frame = QFrame() button_frame.setFrameShape(QFrame.NoFrame) button_layout = QVBoxLayout(button_frame) button_layout.setContentsMargins(0, 0, 0, 0) self.start_button = QPushButton("START TEST") self.start_button.setStyleSheet(f""" QPushButton {{ background-color: {self.accent_color}; color: {self.fg_color}; font-weight: bold; padding: 6px; border-radius: 4px; }} QPushButton:disabled {{ background-color: #4C566A; color: #D8DEE9; }} """) self.start_button.clicked.connect(self.start_test) button_layout.addWidget(self.start_button) self.stop_button = QPushButton("STOP TEST") self.stop_button.setStyleSheet(f""" QPushButton {{ background-color: {self.warning_color}; color: {self.fg_color}; font-weight: bold; padding: 6px; border-radius: 4px; }} QPushButton:disabled {{ background-color: #4C566A; color: #D8DEE9; }} """) self.stop_button.clicked.connect(self.stop_test) self.stop_button.setEnabled(False) button_layout.addWidget(self.stop_button) # Continuous mode checkbox self.continuous_mode_check = QCheckBox("Continuous Mode") self.continuous_mode_check.setChecked(True) self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};") button_layout.addWidget(self.continuous_mode_check) self.continuous_mode_check.stateChanged.connect(self.handle_continuous_mode_change) controls_layout.addWidget(button_frame) self.main_layout.addWidget(controls_frame) # Plot area self.setup_plot() # Status bar self.status_bar = self.statusBar() self.status_bar.setStyleSheet(f"color: {self.fg_color};") self.status_bar.showMessage("Ready") # Apply dark theme self.setStyleSheet(f""" QMainWindow {{ background-color: {self.bg_color}; }} QLabel {{ color: {self.fg_color}; }} QLineEdit {{ background-color: #3B4252; color: {self.fg_color}; border: 1px solid #4C566A; border-radius: 3px; padding: 2px; }} """) def handle_continuous_mode_change(self, state): """Handle changes to continuous mode checkbox during operation""" if not state and self.test_running: # If unchecked during test self.status_bar.showMessage("Continuous mode disabled - will complete current cycle") # Optional visual feedback self.continuous_mode_check.setStyleSheet(f"color: {self.warning_color};") QTimer.singleShot(2000, lambda: self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")) def setup_plot(self): """Configure the matplotlib plot""" self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color) self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) self.ax = self.fig.add_subplot(111) self.ax.set_facecolor('#3B4252') # Set initial voltage range voltage_padding = 0.2 min_voltage = max(0, 0.9 - voltage_padding) max_voltage = 1.43 + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) # Voltage plot self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) self.ax.set_ylabel("Voltage (V)", color='#00BFFF') self.ax.tick_params(axis='y', labelcolor='#00BFFF') # Current plot (right axis) self.ax2 = self.ax.twinx() current_padding = 0.05 test_current = 0.1 * 0.2 # Default values max_current = test_current * 1.5 self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) self.ax2.set_ylabel("Current (A)", color='r') self.ax2.tick_params(axis='y', labelcolor='r') self.ax.set_xlabel('Time (s)', color=self.fg_color) self.ax.set_title('Battery Test (CC)', color=self.fg_color) self.ax.tick_params(axis='x', colors=self.fg_color) self.ax.grid(True, color='#4C566A') # Position legends self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) # Embed plot self.canvas = FigureCanvas(self.fig) self.canvas.setStyleSheet(f"background-color: {self.bg_color};") self.main_layout.addWidget(self.canvas, 1) def init_device(self): """Initialize the ADALM1000 device with continuous measurement""" try: # Clean up any existing session if hasattr(self, 'session'): try: self.session.end() del self.session except: pass time.sleep(1) self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) if not self.session.devices: raise Exception("No ADALM1000 detected - check connections") self.dev = self.session.devices[0] self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].constant(0) self.session.start(0) self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;") self.connection_label.setText("Connected") self.status_bar.showMessage("Device connected | Ready to measure") self.session_active = True self.start_button.setEnabled(True) # Start measurement thread self.measurement_thread = MeasurementThread(self.dev, self.interval) self.measurement_thread.update_signal.connect(self.update_measurements) self.measurement_thread.error_signal.connect(self.handle_device_error) # Start the QThread directly (no need for threading.Thread) self.measurement_thread.start() except Exception as e: self.handle_device_error(str(e)) @pyqtSlot(float, float, float) def update_measurements(self, voltage, current, current_time): try: # Nur Daten speichern, wenn der Test läuft if not self.test_running: return with self.plot_mutex: self.time_data.append(current_time) self.voltage_data.append(voltage) self.current_data.append(current) # Update display labels (immer) self.voltage_label.setText(f"{voltage:.4f}") self.current_label.setText(f"{current:.4f}") self.time_label.setText(self.format_time(current_time)) # Plot-Updates drosseln (max. 10Hz) now = time.time() if not hasattr(self, '_last_plot_update'): self._last_plot_update = 0 if now - self._last_plot_update >= 0.1: # 100ms minimum zwischen Updates self._last_plot_update = now QTimer.singleShot(0, self.update_plot) except Exception as e: print(f"Error in update_measurements: {e}") def update_status(self): """Update status information periodically""" now = time.time() if not hasattr(self, '_last_log_time'): self._last_log_time = now if self.test_running: # Update capacity calculations if in test mode if self.measuring and self.time_data: current_time = time.time() - self.start_time delta_t = current_time - self.last_update_time self.last_update_time = current_time if self.test_phase == "Discharge": current_current = abs(self.current_data[-1]) self.capacity_ah += current_current * delta_t / 3600 self.capacity_label.setText(f"{self.capacity_ah:.4f}") elif self.test_phase == "Charge": current_current = abs(self.current_data[-1]) self.charge_capacity += current_current * delta_t / 3600 self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}") # Logging (1x pro Sekunde) if hasattr(self, 'log_writer') and (now - self._last_log_time >= 1.0): if self.time_data: current_time = self.time_data[-1] voltage = self.voltage_data[-1] current = self.current_data[-1] self.log_writer.writerow([ f"{current_time:.3f}", f"{voltage:.6f}", f"{current:.6f}", self.test_phase, f"{self.capacity_ah:.4f}", f"{self.charge_capacity:.4f}", f"{self.coulomb_efficiency:.1f}", f"{self.cycle_count}" ]) self.current_cycle_file.flush() self._last_log_time = now # Zyklusbeginn erkennen: Wechsel von Resting → Charge if self.test_running: phase = self.test_phase if (phase == "Charge" and self.last_logged_phase != "Charge"): self.create_cycle_log_file() # Neue Datei für neuen Zyklus self.last_logged_phase = phase def start_test(self): """Start the full battery test cycle""" # Clean up any previous test if hasattr(self, 'test_sequence_thread'): self.test_sequence_thread.quit() self.test_sequence_thread.wait(500) if hasattr(self, 'test_sequence_worker'): self.test_sequence_worker.deleteLater() del self.test_sequence_thread # Reset stop flag self.request_stop = False if not self.test_running: try: # Get parameters from UI self.capacity = float(self.capacity_input.text()) self.charge_cutoff = float(self.charge_cutoff_input.text()) self.discharge_cutoff = float(self.discharge_cutoff_input.text()) self.rest_time = float(self.rest_time_input.text()) self.c_rate = float(self.c_rate_input.text()) # Validate inputs if self.capacity <= 0: raise ValueError("Battery capacity must be positive") if self.charge_cutoff <= self.discharge_cutoff: raise ValueError("Charge cutoff must be higher than discharge cutoff") if self.c_rate <= 0: raise ValueError("C-rate must be positive") test_current = self.c_rate * self.capacity if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") # Clear ALL previous data completely with self.plot_mutex: self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.phase_data.clear() # Reset capacities and timing self.start_time = time.time() self.last_update_time = self.start_time self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 # Reset measurement thread's timer and queues if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() self.measurement_thread.voltage_window.clear() self.measurement_thread.current_window.clear() with self.measurement_thread.measurement_queue.mutex: self.measurement_thread.measurement_queue.queue.clear() # Reset plot completely self.reset_plot() # Start test self.test_running = True self.start_time = time.time() self.last_update_time = time.time() self.test_phase = "Initial Discharge" self.phase_label.setText(self.test_phase) self.start_button.setEnabled(False) self.stop_button.setEnabled(True) self.status_bar.showMessage(f"Test started | Discharging to {self.discharge_cutoff}V @ {test_current:.3f}A") # Start test sequence in a QThread self.test_sequence_thread = QThread() self.test_sequence_worker = TestSequenceWorker( self.dev, test_current, self.charge_cutoff, self.discharge_cutoff, self.rest_time, self.continuous_mode_check.isChecked(), self # Pass reference to main window for callbacks ) self.test_sequence_worker.moveToThread(self.test_sequence_thread) # Connect signals self.test_sequence_worker.update_phase.connect(self.update_test_phase) self.test_sequence_worker.update_status.connect(self.status_bar.showMessage) self.test_sequence_worker.test_completed.connect(self.finalize_test) self.test_sequence_worker.error_occurred.connect(self.handle_test_error) self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit) self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater) self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater) # Start the thread and the worker's run method self.test_sequence_thread.start() QTimer.singleShot(0, self.test_sequence_worker.run) # Start capacity calculation timer if not already running if not self.status_timer.isActive(): self.status_timer.start(1000) except Exception as e: QMessageBox.critical(self, "Error", str(e)) # Ensure buttons are in correct state if error occurs self.start_button.setEnabled(True) self.stop_button.setEnabled(False) def create_cycle_log_file(self): """Create a new log file for the current cycle""" try: # Close previous file if exists if hasattr(self, 'current_cycle_file') and self.current_cycle_file: try: self.current_cycle_file.close() except Exception as e: print(f"Error closing previous log file: {e}") # Ensure log directory exists os.makedirs(self.log_dir, exist_ok=True) if not os.access(self.log_dir, os.W_OK): QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}") return False # Generate base filename on first cycle if not hasattr(self, 'base_filename'): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}") # Find next available cycle number cycle_num = 1 while os.path.exists(f"{self.base_filename}_{cycle_num}.csv"): cycle_num += 1 self.filename = f"{self.base_filename}_{cycle_num}.csv" # Open new file try: self.current_cycle_file = open(self.filename, 'w', newline='') # Write header with test parameters test_current = self.c_rate * self.capacity test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A" self.current_cycle_file.write(f"# ADALM1000 Battery Test Log - Cycle {cycle_num}\n") self.current_cycle_file.write(f"# Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") self.current_cycle_file.write(f"# Battery Capacity: {self.capacity} Ah\n") self.current_cycle_file.write(f"# Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n") self.current_cycle_file.write(f"# Charge Cutoff: {self.charge_cutoff} V\n") self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n") self.current_cycle_file.write(f"# Rest Time: {self.rest_time} hours\n") self.current_cycle_file.write(f"# Test Conditions/Chemistry: {test_conditions}\n") self.current_cycle_file.write("#\n") # Write data header self.log_writer = csv.writer(self.current_cycle_file) self.log_writer.writerow([ "Time(s)", "Voltage(V)", "Current(A)", "Phase", "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", "Coulomb_Eff(%)", "Cycle" ]) self.log_buffer = [] return True except Exception as e: QMessageBox.critical(self, "Error", f"Failed to create log file: {e}") return False except Exception as e: print(f"Error in create_cycle_log_file: {e}") return False def format_time(self, seconds): """Convert seconds to hh:mm:ss format""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def stop_test(self): """Request immediate stop of the test""" if not self.test_running: return self.request_stop = True self.test_running = False self.measuring = False self.test_phase = "Idle" self.phase_label.setText(self.test_phase) # Stop test sequence worker if it exists and is not already deleted if hasattr(self, 'test_sequence_worker'): try: if not sip.isdeleted(self.test_sequence_worker): self.test_sequence_worker.stop() except: pass # Reset device channels if hasattr(self, 'dev'): try: self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].mode = pysmu.Mode.HI_Z except Exception as e: print(f"Error resetting device: {e}") # Clear all data buffers self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.phase_data.clear() # Reset capacities self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 QApplication.processEvents() time.sleep(0.1) # Reset plot self.reset_plot() # Update UI self.status_bar.showMessage("Test stopped - Ready for new test") self.stop_button.setEnabled(False) self.start_button.setEnabled(True) def finalize_test(self): """Final cleanup after test completes or is stopped""" try: # 1. Stop any active measurement or test operations self.measuring = False self.test_running = False # 2. Reset device to safe state if hasattr(self, 'dev'): try: self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].mode = pysmu.Mode.HI_Z except Exception as e: print(f"Error resetting device in finalize: {e}") # 3. Clean up test sequence thread safely if hasattr(self, 'test_sequence_thread'): try: # Check if thread is still running if self.test_sequence_thread.isRunning(): # First try to stop the worker if it exists if hasattr(self, 'test_sequence_worker'): try: self.test_sequence_worker.stop() except RuntimeError: pass # Already deleted # Quit the thread self.test_sequence_thread.quit() self.test_sequence_thread.wait(500) except RuntimeError: pass # Already deleted except Exception as e: print(f"Error stopping test sequence thread: {e}") finally: # Only try to delete if the object still exists if hasattr(self, 'test_sequence_worker'): try: if not sip.isdeleted(self.test_sequence_worker): self.test_sequence_worker.deleteLater() except: pass # Remove references if hasattr(self, 'test_sequence_thread'): try: if not sip.isdeleted(self.test_sequence_thread): self.test_sequence_thread.deleteLater() except: pass finally: if hasattr(self, 'test_sequence_thread'): del self.test_sequence_thread # 4. Finalize log file test_current = self.c_rate * self.capacity test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A" if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None: try: # Write any buffered data if hasattr(self, 'log_buffer') and self.log_buffer: self.log_writer.writerows(self.log_buffer) self.log_buffer.clear() # Write test summary self.current_cycle_file.write("\n# TEST SUMMARY\n") self.current_cycle_file.write(f"# Test Parameters:\n") self.current_cycle_file.write(f"# - Battery Capacity: {self.capacity} Ah\n") self.current_cycle_file.write(f"# - Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n") self.current_cycle_file.write(f"# - Charge Cutoff: {self.charge_cutoff} V\n") self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n") self.current_cycle_file.write(f"# - Test Conditions: {test_conditions}\n") self.current_cycle_file.write(f"# Results:\n") self.current_cycle_file.write(f"# - Cycles Completed: {self.cycle_count}\n") self.current_cycle_file.write(f"# - Final Discharge Capacity: {self.capacity_ah:.4f} Ah\n") self.current_cycle_file.write(f"# - Final Charge Capacity: {self.charge_capacity:.4f} Ah\n") self.current_cycle_file.write(f"# - Coulombic Efficiency: {self.coulomb_efficiency:.1f}%\n") self.current_cycle_file.close() except Exception as e: print(f"Error closing log file: {e}") finally: self.current_cycle_file = None # 5. Reset UI and state self.request_stop = False self.start_button.setEnabled(True) self.stop_button.setEnabled(False) # 6. Show completion message if test wasn't stopped by user if not self.request_stop: message = ( f"Test completed | " f"Cycle {self.cycle_count} | " f"Capacity: {self.capacity_ah:.3f}Ah | " f"Efficiency: {self.coulomb_efficiency:.1f}%" ) self.status_bar.showMessage(message) QMessageBox.information( self, "Test Completed", f"Test completed successfully.\n\n" f"Test Parameters:\n" f"- Capacity: {self.capacity} Ah\n" f"- Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n" f"- Charge Cutoff: {self.charge_cutoff} V\n" f"- Discharge Cutoff: {self.discharge_cutoff} V\n" f"- Conditions: {test_conditions}\n\n" f"Results:\n" f"- Cycles: {self.cycle_count}\n" f"- Discharge capacity: {self.capacity_ah:.3f}Ah\n" f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%" ) except Exception as e: print(f"Error in finalize_test: {e}") import traceback traceback.print_exc() # Ensure we don't leave the UI in a locked state self.start_button.setEnabled(True) self.stop_button.setEnabled(False) self.status_bar.showMessage("Error during test finalization") def reset_plot(self): """Completely reset the plot - clears all data and visuals""" # 1. Clear line data self.line_voltage.set_data([], []) self.line_current.set_data([], []) # 2. Clear data buffers self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.phase_data.clear() # 3. Reset axes with appropriate ranges voltage_padding = 0.2 min_voltage = max(0, self.discharge_cutoff - voltage_padding) max_voltage = self.charge_cutoff + voltage_padding self.ax.set_xlim(0, 10) # Reset X axis self.ax.set_ylim(min_voltage, max_voltage) # Reset twin axis (current) current_padding = 0.05 test_current = self.c_rate * self.capacity max_current = test_current * 1.5 self.ax2.set_xlim(0, 10) self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) # 4. Clear any matplotlib internal caches self.fig.canvas.draw_idle() self.fig.canvas.flush_events() # 5. Force immediate redraw self.canvas.draw() def write_cycle_summary(self): """Write cycle summary to the current cycle's log file""" if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file: return summary_line = ( f"Cycle {self.cycle_count} Summary - " f"Discharge={self.capacity_ah:.4f}Ah, " f"Charge={self.charge_capacity:.4f}Ah, " f"Efficiency={self.coulomb_efficiency:.1f}%" ) try: if self.log_buffer: self.log_writer.writerows(self.log_buffer) self.log_buffer.clear() self.current_cycle_file.write(summary_line + "\n") self.current_cycle_file.flush() except Exception as e: print(f"Error writing cycle summary: {e}") def update_plot(self): """More reliable plotting with better error handling""" try: # Create local copies safely with self.plot_mutex: if not self.time_data or not self.voltage_data or not self.current_data: return if len(self.time_data) != len(self.voltage_data) or len(self.time_data) != len(self.current_data): # Find the minimum length to avoid mismatch min_len = min(len(self.time_data), len(self.voltage_data), len(self.current_data)) x_data = np.array(self.time_data[-min_len:]) y1_data = np.array(self.voltage_data[-min_len:]) y2_data = np.array(self.current_data[-min_len:]) else: x_data = np.array(self.time_data) y1_data = np.array(self.voltage_data) y2_data = np.array(self.current_data) # Update plot data self.line_voltage.set_data(x_data, y1_data) self.line_current.set_data(x_data, y2_data) # Auto-scale when needed if len(x_data) > 0 and x_data[-1] > self.ax.get_xlim()[1] * 0.8: self.auto_scale_axes() # Force redraw self.canvas.draw_idle() except Exception as e: print(f"Plot update error: {e}") import traceback traceback.print_exc() # Reset plot on error with self.plot_mutex: self.line_voltage.set_data([], []) self.line_current.set_data([], []) self.canvas.draw_idle() def auto_scale_axes(self): """Auto-scale plot axes with appropriate padding and strict boundaries""" if not self.time_data: return min_time = 0 max_time = self.time_data[-1] current_xlim = self.ax.get_xlim() if max_time > current_xlim[1] * 0.95: new_max = max_time * 1.05 self.ax.set_xlim(min_time, new_max) self.ax2.set_xlim(min_time, new_max) voltage_padding = 0.2 if self.voltage_data: min_voltage = max(0, min(self.voltage_data) - voltage_padding) max_voltage = min(5.0, max(self.voltage_data) + voltage_padding) current_ylim = self.ax.get_ylim() if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1): self.ax.set_ylim(min_voltage, max_voltage) current_padding = 0.05 if self.current_data: min_current = max(-0.25, min(self.current_data) - current_padding) max_current = min(0.25, max(self.current_data) + current_padding) current_ylim2 = self.ax2.get_ylim() if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02): self.ax2.set_ylim(min_current, max_current) @pyqtSlot(str) def handle_device_error(self, error): """Handle device connection errors""" error_msg = str(error) print(f"Device error: {error_msg}") if hasattr(self, 'session'): try: if self.session_active: self.session.end() del self.session except Exception as e: print(f"Error cleaning up session: {e}") self.status_light.setStyleSheet(f"background-color: red; border-radius: 10px;") self.connection_label.setText("Disconnected") self.status_bar.showMessage(f"Device error: {error_msg}") self.session_active = False self.test_running = False self.continuous_mode = False self.measuring = False self.start_button.setEnabled(False) self.stop_button.setEnabled(False) self.time_data.clear() self.voltage_data.clear() self.current_data.clear() @pyqtSlot(str) def update_test_phase(self, phase_text): """Update the test phase display""" self.test_phase = phase_text self.phase_label.setText(phase_text) # Update log if available if hasattr(self, 'log_buffer'): current_time = time.time() - self.start_time self.log_buffer.append([ f"{current_time:.3f}", "", "", phase_text, f"{self.capacity_ah:.4f}", f"{self.charge_capacity:.4f}", f"{self.coulomb_efficiency:.1f}" if hasattr(self, 'coulomb_efficiency') else "0.0", f"{self.cycle_count}" ]) @pyqtSlot(str) def handle_test_error(self, error_msg): """Handle errors from the test sequence with complete cleanup""" try: # 1. Notify user QMessageBox.critical(self, "Test Error", f"An error occurred:\n{error_msg}\n\nAttempting to recover...") # 2. Stop all operations self.stop_test() # 3. Reset UI elements if hasattr(self, 'line_voltage'): try: self.line_voltage.set_data([], []) self.line_current.set_data([], []) self.ax.set_xlim(0, 1) self.ax2.set_xlim(0, 1) self.canvas.draw() except Exception as plot_error: print(f"Plot reset error: {plot_error}") # 4. Update status self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...") self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;") # 5. Attempt recovery QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect except Exception as e: print(f"Error in error handler: {e}") # Fallback - restart application? QMessageBox.critical(self, "Fatal Error", "The application needs to restart due to an unrecoverable error") QTimer.singleShot(1000, self.close) def attempt_reconnect(self): """Attempt to reconnect automatically""" QMessageBox.critical( self, "Device Connection Error", "Could not connect to ADALM1000\n\n" "1. Check USB cable connection\n" "2. The device will attempt to reconnect automatically" ) QTimer.singleShot(1000, self.reconnect_device) def reconnect_device(self): """Reconnect the device with proper cleanup""" self.status_bar.showMessage("Attempting to reconnect...") if hasattr(self, 'session'): try: if self.session_active: self.session.end() del self.session except: pass self.test_running = False self.continuous_mode = False self.measuring = False if hasattr(self, 'measurement_thread'): self.measurement_thread.stop() time.sleep(1.5) try: self.init_device() if self.session_active: self.status_bar.showMessage("Reconnected successfully") return except Exception as e: print(f"Reconnect failed: {e}") self.status_bar.showMessage("Reconnect failed - will retry...") QTimer.singleShot(2000, self.reconnect_device) def closeEvent(self, event): """Clean up on window close""" self.test_running = False self.measuring = False self.session_active = False # Stop measurement thread if hasattr(self, 'measurement_thread'): self.measurement_thread.stop() # Stop test sequence thread if hasattr(self, 'test_sequence_thread'): if hasattr(self, 'test_sequence_worker'): self.test_sequence_worker.stop() self.test_sequence_thread.quit() self.test_sequence_thread.wait(500) # Clean up device session if hasattr(self, 'session') and self.session: try: self.session.end() except Exception as e: print(f"Error ending session: {e}") event.accept() if __name__ == "__main__": app = QApplication([]) try: window = BatteryTester() window.show() app.exec_() except Exception as e: QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}")