# -*- coding: utf-8 -*- import os import time import csv import threading from datetime import datetime import numpy as np import matplotlib matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure from collections import deque from queue import Queue, Full, Empty from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog, QComboBox) from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread from PyQt5 import sip import pysmu class DeviceDisconnectedError(Exception): pass class MeasurementThread(QThread): update_signal = pyqtSignal(float, float, float) error_signal = pyqtSignal(str) def __init__(self, device, interval=0.1): super().__init__() self.device = device self.interval = interval self._running = False self.filter_window_size = 10 self.voltage_window = [] self.current_window = [] self.start_time = None self.measurement_queue = Queue(maxsize=1) self.current_direction = 1 # 1 for source, -1 for sink def run(self): """Continuous measurement loop""" self._running = True if self.start_time is None: # Nur setzen wenn noch nicht gesetzt self.start_time = time.time() while self._running: try: samples = self.device.read(self.filter_window_size, 500, True) if not samples: raise DeviceDisconnectedError("No samples received") current_time = time.time() - self.start_time # Get voltage from Channel B (HI_Z mode) and current from Channel A raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage raw_current = np.mean([s[0][1] for s in samples]) * self.current_direction # Channel A current with direction # Update filter windows self.voltage_window.append(raw_voltage) self.current_window.append(raw_current) if len(self.voltage_window) > self.filter_window_size: self.voltage_window.pop(0) self.current_window.pop(0) voltage = np.mean(self.voltage_window) current = np.mean(self.current_window) # Validate measurements if voltage is None or not (-1.0 <= voltage <= 6.0): raise ValueError(f"Invalid voltage: {voltage}V") if not (-0.25 <= current <= 0.25): raise ValueError(f"Invalid current: {current}A") # Emit update self.update_signal.emit(voltage, current, current_time) # Store measurement try: self.measurement_queue.put_nowait((voltage, current)) except Full: pass time.sleep(max(0.05, self.interval)) except Exception as e: self.error_signal.emit(f"Read error: {str(e)}") time.sleep(1) continue def set_direction(self, direction): """Set current direction (1 for source, -1 for sink)""" self.current_direction = direction def stop(self): self._running = False self.wait(500) class TestSequenceWorker(QObject): finished = pyqtSignal() update_phase = pyqtSignal(str) update_status = pyqtSignal(str) test_completed = pyqtSignal() error_occurred = pyqtSignal(str) def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent): super().__init__() self.device = device self.test_current = test_current self.charge_cutoff = charge_cutoff self.discharge_cutoff = discharge_cutoff self.rest_time = rest_time * 3600 # Convert hours to seconds self.continuous_mode = continuous_mode self.parent = parent self._running = True self.voltage_timeout = 0.5 # seconds def get_latest_measurement(self): """Thread-safe measurement reading with timeout""" try: return self.parent.measurement_thread.measurement_queue.get( timeout=self.voltage_timeout ) except Empty: return (None, None) # Return tuple for unpacking def charge_phase(self): """Handle the battery charging phase""" self.update_phase.emit("Charge") self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.4f}A") try: # Configure channels - Channel A sources current, Channel B measures voltage self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV self.device.channels['A'].constant(self.test_current) self.parent.measurement_thread.set_direction(1) # Source current # Small delay to allow current to stabilize time.sleep(0.1) while self._running: voltage, current = self.get_latest_measurement() if voltage is None: continue # Update parent's data for logging/display with self.parent.plot_mutex: if len(self.parent.voltage_data) > 0: self.parent.voltage_data[-1] = voltage self.parent.current_data[-1] = current if voltage >= self.charge_cutoff: break time.sleep(0.1) finally: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) def discharge_phase(self): """Handle the battery discharging phase""" voltage, _ = self.get_latest_measurement() if voltage is not None and voltage <= self.discharge_cutoff: self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)") return self.update_phase.emit("Discharge") self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A") try: # Configure channels - Channel A sinks current, Channel B measures voltage self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV self.device.channels['A'].constant(-self.test_current) self.parent.measurement_thread.set_direction(-1) # Sink current # Small delay to allow current to stabilize time.sleep(0.1) while self._running: voltage, current = self.get_latest_measurement() if voltage is None: continue # Update parent's data for logging/display with self.parent.plot_mutex: if len(self.parent.voltage_data) > 0: self.parent.voltage_data[-1] = voltage self.parent.current_data[-1] = current if voltage <= self.discharge_cutoff: break time.sleep(0.1) finally: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) def rest_phase(self, phase_name): """Handle rest period between phases""" self.update_phase.emit(f"Resting ({phase_name})") rest_end = time.time() + self.rest_time while time.time() < rest_end and self._running: time_left = max(0, rest_end - time.time()) self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min") time.sleep(1) def stop(self): """Request the thread to stop""" self._running = False try: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) self.device.channels['B'].mode = pysmu.Mode.HI_Z except Exception as e: print(f"Error stopping device: {e}") def run(self): """Main test sequence loop""" try: first_cycle = True # Ensure at least one cycle runs while (self._running and (self.parent.continuous_mode_check.isChecked() or first_cycle)): self.parent.request_stop = False self.parent.cycle_count += 1 first_cycle = False # Only True for the first cycle # 1. Charge phase (constant current) self.charge_phase() if not self._running or self.parent.request_stop: break # 2. Rest period after charge self.rest_phase("Post-Charge") if not self._running or self.parent.request_stop: break # 3. Discharge phase (capacity measurement) self.discharge_phase() if not self._running or self.parent.request_stop: break # 4. Rest period after discharge (only if not stopping) if self._running and not self.parent.request_stop: self.rest_phase("Post-Discharge") # Calculate Coulomb efficiency if not stopping if not self.parent.request_stop and self.parent.charge_capacity > 0: self.parent.coulomb_efficiency = ( self.parent.capacity_ah / self.parent.charge_capacity ) * 100 # Test completed self.test_completed.emit() except Exception as e: self.error_occurred.emit(f"Test sequence error: {str(e)}") finally: self.finished.emit() class DischargeWorker(QObject): finished = pyqtSignal() update_status = pyqtSignal(str) test_completed = pyqtSignal() error_occurred = pyqtSignal(str) def __init__(self, device, test_current, discharge_cutoff, parent): super().__init__() self.device = device self.test_current = test_current self.discharge_cutoff = discharge_cutoff self.parent = parent self._running = True self.voltage_timeout = 0.5 # seconds def get_latest_measurement(self): """Thread-safe measurement reading with timeout""" try: return self.parent.measurement_thread.measurement_queue.get( timeout=self.voltage_timeout ) except Empty: return (None, None) # Return tuple for unpacking def discharge_phase(self): """Handle the battery discharging phase""" voltage, _ = self.get_latest_measurement() if voltage is not None and voltage <= self.discharge_cutoff: self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)") return self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A") try: # Configure channels - Channel A sinks current, Channel B measures voltage self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV self.device.channels['A'].constant(-self.test_current) self.parent.measurement_thread.set_direction(-1) # Sink current # Small delay to allow current to stabilize time.sleep(0.1) while self._running: voltage, current = self.get_latest_measurement() if voltage is None: continue # Update parent's data for logging/display with self.parent.plot_mutex: if len(self.parent.voltage_data) > 0: self.parent.voltage_data[-1] = voltage self.parent.current_data[-1] = current if voltage <= self.discharge_cutoff: break time.sleep(0.1) finally: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) def stop(self): """Request the thread to stop""" self._running = False try: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) self.device.channels['B'].mode = pysmu.Mode.HI_Z except Exception as e: print(f"Error stopping device: {e}") def run(self): """Main discharge sequence""" try: self.parent.request_stop = False self.parent.cycle_count = 1 # Only one discharge cycle # Discharge phase self.discharge_phase() if not self._running or self.parent.request_stop: return # Test completed self.test_completed.emit() except Exception as e: self.error_occurred.emit(f"Discharge error: {str(e)}") finally: self.finished.emit() class ChargeWorker(QObject): finished = pyqtSignal() update_status = pyqtSignal(str) test_completed = pyqtSignal() error_occurred = pyqtSignal(str) def __init__(self, device, test_current, charge_cutoff, parent): super().__init__() self.device = device self.test_current = test_current self.charge_cutoff = charge_cutoff self.parent = parent self._running = True def run(self): """Main charge sequence""" try: self.parent.measurement_thread.set_direction(1) # Source current # Configure channels - Channel A sources current, Channel B measures voltage self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV self.device.channels['A'].constant(self.test_current) time.sleep(0.1) # Allow current to stabilize while self._running: voltage, current = self.parent.get_latest_measurement() if voltage is None: continue # Update parent's data for logging/display with self.parent.plot_mutex: if len(self.parent.voltage_data) > 0: self.parent.voltage_data[-1] = voltage self.parent.current_data[-1] = current if voltage >= self.charge_cutoff: break time.sleep(0.1) self.test_completed.emit() except Exception as e: self.error_occurred.emit(f"Charge error: {str(e)}") finally: self.device.channels['A'].constant(0) self.finished.emit() def stop(self): """Request the thread to stop""" self._running = False try: self.device.channels['A'].constant(0) except Exception as e: print(f"Error stopping charge: {e}") class BatteryTester(QMainWindow): def __init__(self): self.plot_mutex = threading.Lock() super().__init__() self.last_logged_phase = None # Color scheme self.bg_color = "#2E3440" self.fg_color = "#D8DEE9" self.accent_color = "#5E81AC" self.warning_color = "#BF616A" self.success_color = "#A3BE8C" # Device and measurement state self.session_active = False self.measuring = False self.test_running = False self.continuous_mode = False self.request_stop = False self.interval = 0.1 self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) # Data buffers max_data_points = 36000 # Define this first self.time_data = deque(maxlen=max_data_points) self.voltage_data = deque(maxlen=max_data_points) self.current_data = deque(maxlen=max_data_points) self.max_points_to_keep = 10000 self.display_time_data = deque(maxlen=self.max_points_to_keep) self.display_voltage_data = deque(maxlen=self.max_points_to_keep) self.display_current_data = deque(maxlen=self.max_points_to_keep) self.aggregation_buffer = { 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0 } self.phase_data = deque() self.downsample_factor = 1 # Initial kein Downsampling self.downsample_counter = 0 # Initialize all measurement variables self.capacity_ah = 0.0 self.energy = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 self.start_time = time.time() self.last_update_time = self.start_time # Initialize UI and device self.setup_ui() self.init_device() # Set window properties self.setWindowTitle("ADALM1000 - Battery Tester (Multi-Mode)") self.resize(1000, 800) self.setMinimumSize(800, 700) # Status update timer self.status_timer = QTimer() self.status_timer.timeout.connect(self.update_status_and_plot) self.status_timer.start(1000) #every second def setup_ui(self): """Configure the user interface""" # Main widget and layout self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.main_layout = QVBoxLayout(self.central_widget) self.main_layout.setContentsMargins(10, 10, 10, 10) # Mode and device selection mode_frame = QFrame() mode_frame.setFrameShape(QFrame.StyledPanel) mode_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") mode_layout = QHBoxLayout(mode_frame) # Test mode selection self.mode_label = QLabel("Test Mode:") self.mode_label.setStyleSheet(f"color: {self.fg_color};") mode_layout.addWidget(self.mode_label) self.mode_combo = QComboBox() self.mode_combo.addItems(["Live Monitoring", "Discharge Test", "Charge Test", "Cycle Test"]) self.mode_combo.setStyleSheet(f""" QComboBox {{ background-color: #3B4252; color: {self.fg_color}; border: 1px solid #4C566A; border-radius: 3px; padding: 2px; }} """) self.mode_combo.currentTextChanged.connect(self.change_mode) mode_layout.addWidget(self.mode_combo, 1) # Device selection self.device_label = QLabel("ADALM1000 Device:") self.device_label.setStyleSheet(f"color: {self.fg_color};") mode_layout.addWidget(self.device_label) self.device_combo = QComboBox() self.device_combo.setStyleSheet(f""" QComboBox {{ background-color: #3B4252; color: {self.fg_color}; border: 1px solid #4C566A; border-radius: 3px; padding: 2px; }} """) self.device_combo.currentIndexChanged.connect(self.change_device) mode_layout.addWidget(self.device_combo, 1) self.main_layout.addWidget(mode_frame) # Header area header_frame = QFrame() header_frame.setFrameShape(QFrame.NoFrame) header_layout = QHBoxLayout(header_frame) header_layout.setContentsMargins(0, 0, 0, 0) self.title_label = QLabel("ADALM1000 Battery Tester") self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};") header_layout.addWidget(self.title_label, 1) # Status indicator self.status_light = QLabel() self.status_light.setFixedSize(20, 20) self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") header_layout.addWidget(self.status_light) self.connection_label = QLabel("Disconnected") header_layout.addWidget(self.connection_label) # Reconnect button self.reconnect_btn = QPushButton("Reconnect") self.reconnect_btn.clicked.connect(self.reconnect_device) header_layout.addWidget(self.reconnect_btn) self.main_layout.addWidget(header_frame) # Measurement display display_frame = QFrame() display_frame.setFrameShape(QFrame.StyledPanel) display_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") display_layout = QGridLayout(display_frame) # Measurement values - common for all modes measurement_labels = [ ("Voltage", "V"), ("Current", "A"), ("Test Phase", ""), ("Elapsed Time", "s"), ("Capacity", "Ah"), ("Power", "W"), ("Energy", "Wh"), ("Cycle Count", ""), ("Battery Temp", "°C") ] for i, (label, unit) in enumerate(measurement_labels): row = i // 3 col = (i % 3) * 3 lbl = QLabel(f"{label}:") lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;") display_layout.addWidget(lbl, row, col) value_lbl = QLabel("0.000") value_lbl.setStyleSheet(f""" color: {self.fg_color}; font-weight: bold; font-size: 12px; min-width: 60px; """) display_layout.addWidget(value_lbl, row, col + 1) if unit: unit_lbl = QLabel(unit) unit_lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;") display_layout.addWidget(unit_lbl, row, col + 2) for i in range(9): display_layout.setColumnStretch(i, 1 if i % 3 == 1 else 0) self.voltage_label = display_layout.itemAtPosition(0, 1).widget() self.current_label = display_layout.itemAtPosition(0, 4).widget() self.phase_label = display_layout.itemAtPosition(0, 7).widget() self.time_label = display_layout.itemAtPosition(1, 1).widget() self.capacity_label = display_layout.itemAtPosition(1, 4).widget() self.power_label = display_layout.itemAtPosition(1, 7).widget() self.energy_label = display_layout.itemAtPosition(2, 1).widget() self.cycle_label = display_layout.itemAtPosition(2, 4).widget() self.temp_label = display_layout.itemAtPosition(2, 7).widget() self.main_layout.addWidget(display_frame) # Control area controls_frame = QFrame() controls_frame.setFrameShape(QFrame.NoFrame) controls_layout = QHBoxLayout(controls_frame) controls_layout.setContentsMargins(0, 0, 0, 0) # Parameters frame self.params_frame = QFrame() self.params_frame.setFrameShape(QFrame.StyledPanel) self.params_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") self.params_layout = QGridLayout(self.params_frame) # Common parameters self.capacity = 0.2 self.capacity_label_input = QLabel("Battery Capacity (Ah):") self.capacity_label_input.setStyleSheet(f"color: {self.fg_color};") self.params_layout.addWidget(self.capacity_label_input, 0, 0) self.capacity_input = QLineEdit("0.2") self.capacity_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.capacity_input.setFixedWidth(60) self.params_layout.addWidget(self.capacity_input, 0, 1) # C-rate for test self.c_rate = 0.1 self.c_rate_label = QLabel("Test C-rate:") self.c_rate_label.setStyleSheet(f"color: {self.fg_color};") self.params_layout.addWidget(self.c_rate_label, 1, 0) self.c_rate_input = QLineEdit("0.1") self.c_rate_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.c_rate_input.setFixedWidth(40) self.params_layout.addWidget(self.c_rate_input, 1, 1) c_rate_note = QLabel("(e.g., 0.2 for C/5)") c_rate_note.setStyleSheet(f"color: {self.fg_color};") self.params_layout.addWidget(c_rate_note, 1, 2) # Discharge cutoff (used in Discharge and Cycle modes) self.discharge_cutoff = 0.9 self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):") self.discharge_cutoff_label.setStyleSheet(f"color: {self.fg_color};") self.params_layout.addWidget(self.discharge_cutoff_label, 2, 0) self.discharge_cutoff_input = QLineEdit("0.9") self.discharge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.discharge_cutoff_input.setFixedWidth(60) self.params_layout.addWidget(self.discharge_cutoff_input, 2, 1) # Charge cutoff (only for Cycle mode) self.charge_cutoff = 1.43 self.charge_cutoff_label = QLabel("Charge Cutoff (V):") self.charge_cutoff_label.setStyleSheet(f"color: {self.fg_color};") self.params_layout.addWidget(self.charge_cutoff_label, 3, 0) self.charge_cutoff_input = QLineEdit("1.43") self.charge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.charge_cutoff_input.setFixedWidth(60) self.params_layout.addWidget(self.charge_cutoff_input, 3, 1) self.charge_cutoff_label.hide() self.charge_cutoff_input.hide() # Rest time (only for Cycle mode) self.rest_time = 0.25 self.rest_time_label = QLabel("Rest Time (hours):") self.rest_time_label.setStyleSheet(f"color: {self.fg_color};") self.params_layout.addWidget(self.rest_time_label, 4, 0) self.rest_time_input = QLineEdit("0.25") self.rest_time_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.rest_time_input.setFixedWidth(60) self.params_layout.addWidget(self.rest_time_input, 4, 1) self.rest_time_label.hide() self.rest_time_input.hide() # Test conditions input self.test_conditions_label = QLabel("Test Conditions/Chemistry:") self.test_conditions_label.setStyleSheet(f"color: {self.fg_color};") self.params_layout.addWidget(self.test_conditions_label, 5, 0) self.test_conditions_input = QLineEdit("") self.test_conditions_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") self.test_conditions_input.setFixedWidth(120) self.params_layout.addWidget(self.test_conditions_input, 5, 1) controls_layout.addWidget(self.params_frame, 1) # Button frame button_frame = QFrame() button_frame.setFrameShape(QFrame.NoFrame) button_layout = QVBoxLayout(button_frame) button_layout.setContentsMargins(0, 0, 0, 0) # Start/Stop buttons self.start_button = QPushButton("START") self.start_button.setStyleSheet(f""" QPushButton {{ background-color: {self.accent_color}; color: {self.fg_color}; font-weight: bold; padding: 6px; border-radius: 4px; }} QPushButton:disabled {{ background-color: #4C566A; color: #D8DEE9; }} """) self.start_button.clicked.connect(self.start_test) button_layout.addWidget(self.start_button) self.stop_button = QPushButton("STOP") self.stop_button.setStyleSheet(f""" QPushButton {{ background-color: {self.warning_color}; color: {self.fg_color}; font-weight: bold; padding: 6px; border-radius: 4px; }} QPushButton:disabled {{ background-color: #4C566A; color: #D8DEE9; }} """) self.stop_button.clicked.connect(self.stop_test) self.stop_button.setEnabled(False) button_layout.addWidget(self.stop_button) # Continuous mode checkbox (only for Cycle mode) self.continuous_mode_check = QCheckBox("Continuous Mode") self.continuous_mode_check.setChecked(True) self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};") button_layout.addWidget(self.continuous_mode_check) self.continuous_mode_check.stateChanged.connect(self.handle_continuous_mode_change) self.continuous_mode_check.hide() # Record button for Live mode self.record_button = QPushButton("Start Recording") self.record_button.setCheckable(True) self.record_button.setStyleSheet(f""" QPushButton {{ background-color: {self.success_color}; color: {self.fg_color}; font-weight: bold; padding: 6px; border-radius: 4px; }} QPushButton:checked {{ background-color: {self.warning_color}; }} """) self.record_button.clicked.connect(self.toggle_recording) button_layout.addWidget(self.record_button) self.record_button.hide() controls_layout.addWidget(button_frame) self.main_layout.addWidget(controls_frame) # Plot area self.setup_plot() # Status bar self.status_bar = self.statusBar() self.status_bar.setStyleSheet(f"color: {self.fg_color};") self.status_bar.showMessage("Ready") # Apply dark theme self.setStyleSheet(f""" QMainWindow {{ background-color: {self.bg_color}; }} QLabel {{ color: {self.fg_color}; }} QLineEdit {{ background-color: #3B4252; color: {self.fg_color}; border: 1px solid #4C566A; border-radius: 3px; padding: 2px; }} """) # Set initial mode self.current_mode = "Live Monitoring" self.mode_combo.setCurrentText(self.current_mode) self.change_mode(self.current_mode) # Initialize UI for live mode def change_mode(self, mode_name): """Change between different test modes""" self.current_mode = mode_name self.stop_test() # Stop any current operation # Hide all optional parameters first self.charge_cutoff_label.hide() self.charge_cutoff_input.hide() self.discharge_cutoff_label.hide() self.discharge_cutoff_input.hide() self.rest_time_label.hide() self.rest_time_input.hide() self.continuous_mode_check.hide() self.record_button.hide() # Show mode-specific parameters if mode_name == "Cycle Test": self.charge_cutoff_label.show() self.charge_cutoff_input.show() self.discharge_cutoff_label.show() self.discharge_cutoff_input.show() self.rest_time_label.show() self.rest_time_input.show() self.continuous_mode_check.show() self.start_button.setText("START CYCLE TEST") self.start_button.setEnabled(True) # Explicitly enable elif mode_name == "Discharge Test": self.discharge_cutoff_label.show() self.discharge_cutoff_input.show() self.start_button.setText("START DISCHARGE") self.start_button.setEnabled(True) # Explicitly enable elif mode_name == "Charge Test": self.charge_cutoff_label.show() self.charge_cutoff_input.show() self.start_button.setText("START CHARGE") self.start_button.setEnabled(True) # Explicitly enable elif mode_name == "Live Monitoring": self.record_button.show() self.start_button.setText("START MONITORING") # Only enable start button if device is connected self.start_button.setEnabled(self.session_active) # Reset measurement state self.reset_test() self.status_bar.showMessage(f"Mode changed to {mode_name}") def reset_test(self): """Reset test state without stopping measurement""" # Reset Downsampling self.downsample_factor = 1 self.downsample_counter = 0 # Clear all data buffers with self.plot_mutex: self.time_data.clear() self.voltage_data.clear() self.current_data.clear() if hasattr(self, 'phase_data'): self.phase_data.clear() # Also clear display buffers if hasattr(self, 'display_time_data'): self.display_time_data.clear() self.display_voltage_data.clear() self.display_current_data.clear() # Reset aggregation buffer self.aggregation_buffer = { 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0 } # Clear measurement thread buffers if it exists if hasattr(self, 'measurement_thread'): self.measurement_thread.voltage_window.clear() self.measurement_thread.current_window.clear() with self.measurement_thread.measurement_queue.mutex: self.measurement_thread.measurement_queue.queue.clear() self.measurement_thread.start_time = time.time() # Reset capacities and timing self.start_time = time.time() self.last_update_time = self.start_time self.capacity_ah = 0.0 self.energy = 0.0 if hasattr(self, 'charge_capacity'): self.charge_capacity = 0.0 if hasattr(self, 'coulomb_efficiency'): self.coulomb_efficiency = 0.0 # Reset plot self.reset_plot() # Update UI self.phase_label.setText("Idle") if hasattr(self, 'test_phase'): self.test_phase = "Idle" def toggle_recording(self): """Toggle data recording in Live Monitoring mode""" if self.record_button.isChecked(): # Start recording try: # Reset previous data self.reset_test() # Reset measurement timing if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() if self.create_cycle_log_file(): self.record_button.setText("Stop Recording") self.status_bar.showMessage("Live recording started") # Ensure monitoring is running if not self.test_running: self.start_live_monitoring() else: self.record_button.setChecked(False) self.current_cycle_file = None except Exception as e: print(f"Error starting recording: {e}") self.record_button.setChecked(False) self.current_cycle_file = None QMessageBox.critical(self, "Error", f"Failed to start recording:\n{str(e)}") else: # Stop recording try: if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None: self.finalize_log_file() self.record_button.setText("Start Recording") self.status_bar.showMessage("Live recording stopped") except Exception as e: print(f"Error stopping recording: {e}") def handle_continuous_mode_change(self, state): """Handle changes to continuous mode checkbox during operation""" if not state and self.test_running: # If unchecked during test self.status_bar.showMessage("Continuous mode disabled - will complete current cycle") self.continuous_mode_check.setStyleSheet(f"color: {self.warning_color};") QTimer.singleShot(2000, lambda: self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")) def setup_plot(self): """Configure the matplotlib plot""" self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color) self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) self.ax = self.fig.add_subplot(111) self.ax.set_facecolor('#3B4252') # Set initial voltage range voltage_padding = 0.2 min_voltage = max(0, 0.9 - voltage_padding) max_voltage = 1.43 + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) # Voltage plot self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) self.ax.set_ylabel("Voltage (V)", color='#00BFFF') self.ax.tick_params(axis='y', labelcolor='#00BFFF') # Current plot (right axis) self.ax2 = self.ax.twinx() current_padding = 0.05 test_current = 0.1 * 0.2 # Default values max_current = test_current * 1.5 self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) self.ax2.set_ylabel("Current (A)", color='r') self.ax2.tick_params(axis='y', labelcolor='r') self.ax.set_xlabel('Time (s)', color=self.fg_color) self.ax.set_title('Battery Test', color=self.fg_color) self.ax.tick_params(axis='x', colors=self.fg_color) self.ax.grid(True, color='#4C566A') # Position legends self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) # Embed plot self.canvas = FigureCanvas(self.fig) self.canvas.setStyleSheet(f"background-color: {self.bg_color};") self.main_layout.addWidget(self.canvas, 1) def init_device(self): """Initialize ADALM1000 with proper device selection and session handling""" try: # Clean up existing session if hasattr(self, 'session'): try: self.session.end() del self.session except Exception as e: print(f"Error cleaning up session: {e}") time.sleep(0.5) # Brief pause for USB re-enumeration # Initialize new session self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) if not self.session.devices: raise DeviceDisconnectedError("No ADALM1000 devices detected") # Populate device selector self.device_combo.clear() for dev in self.session.devices: self.device_combo.addItem(dev.serial) # Select first device by default self.dev = self.session.devices[0] self.device_combo.setCurrentIndex(0) # Configure channels self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].constant(0) # Start session for the selected device device_index = self.session.devices.index(self.dev) self.session.start(device_index) # Update UI self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") self.connection_label.setText(f"Connected: {self.dev.serial}") self.status_bar.showMessage(f"Ready - Device {self.dev.serial}") self.session_active = True self.start_button.setEnabled(True) # Start measurement thread if hasattr(self, 'measurement_thread'): self.measurement_thread.stop() self.measurement_thread.wait(500) self.measurement_thread = MeasurementThread(self.dev, self.interval) self.measurement_thread.update_signal.connect(self.update_measurements) self.measurement_thread.error_signal.connect(self.handle_device_error) self.measurement_thread.start() except Exception as e: self.handle_device_error(str(e)) def change_device(self, index): """Safely switch to another ADALM1000 device""" if not self.session_active or index < 0 or index >= len(self.session.devices): return try: # Stop current operations self.stop_test() # Stop measurement thread if hasattr(self, 'measurement_thread'): self.measurement_thread.stop() self.measurement_thread.wait(500) # Switch to new device self.dev = self.session.devices[index] # Reconfigure channels self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].constant(0) # Restart session for new device device_index = self.session.devices.index(self.dev) self.session.start(device_index) # Update UI self.device_combo.setCurrentIndex(index) self.connection_label.setText(f"Connected: {self.dev.serial}") self.status_bar.showMessage(f"Switched to device {self.dev.serial}") # Restart measurement self.measurement_thread = MeasurementThread(self.dev, self.interval) self.measurement_thread.update_signal.connect(self.update_measurements) self.measurement_thread.error_signal.connect(self.handle_device_error) self.measurement_thread.start() except Exception as e: self.handle_device_error(f"Device switch failed: {str(e)}") @pyqtSlot(float, float, float) def update_measurements(self, voltage, current, current_time): try: # Only store data if in a test or recording if not (self.test_running or self.record_button.isChecked()): return # 1. Originale Daten immer vollständig speichern (für Berechnungen und Logging) with self.plot_mutex: self.time_data.append(current_time) self.voltage_data.append(voltage) self.current_data.append(current) # 2. Downsampling für die Anzeige if not hasattr(self, 'aggregation_buffer'): self.aggregation_buffer = { 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0 } self.aggregation_buffer['time'].append(current_time) self.aggregation_buffer['voltage'].append(voltage) self.aggregation_buffer['current'].append(current) self.aggregation_buffer['count'] += 1 # Nur aggregieren wenn genug Daten oder Zeit vergangen now = time.time() if (self.aggregation_buffer['count'] >= self.downsample_factor or now - self.aggregation_buffer['last_plot_time'] >= 1.0): # Berechne aggregierte Werte (Mittelwert) agg_time = np.mean(self.aggregation_buffer['time']) agg_voltage = np.mean(self.aggregation_buffer['voltage']) agg_current = np.mean(self.aggregation_buffer['current']) # Für die Anzeige verwenden if not hasattr(self, 'display_time_data'): self.display_time_data = deque(maxlen=self.max_points_to_keep) self.display_voltage_data = deque(maxlen=self.max_points_to_keep) self.display_current_data = deque(maxlen=self.max_points_to_keep) self.display_time_data.append(agg_time) self.display_voltage_data.append(agg_voltage) self.display_current_data.append(agg_current) # Reset Buffer self.aggregation_buffer = { 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': now } # 3. Originale Funktionalität für Berechnungen beibehalten self.voltage_label.setText(f"{voltage:.4f}") self.current_label.setText(f"{abs(current):.4f}") self.time_label.setText(self.format_time(current_time)) # Calculate and display power and energy power = voltage * abs(current) self.power_label.setText(f"{power:.4f}") if len(self.time_data) > 1: delta_t = self.time_data[-1] - self.time_data[-2] self.energy += power * delta_t / 3600 # Convert to Wh self.energy_label.setText(f"{self.energy:.4f}") # 4. Auto-Skalierung anpassen if len(self.time_data) > self.max_points_to_keep * 1.5: self.adjust_downsampling() # 5. Plot updates throttled to 10Hz if not hasattr(self, '_last_plot_update'): self._last_plot_update = 0 if now - self._last_plot_update >= 0.1: self._last_plot_update = now QTimer.singleShot(0, self.update_plot) except Exception as e: print(f"Error in update_measurements: {e}") import traceback traceback.print_exc() # Versuche den Aggregationsbuffer zu retten if hasattr(self, 'aggregation_buffer'): agg_buffer = self.aggregation_buffer if agg_buffer['count'] > 0: try: with self.plot_mutex: if not hasattr(self, 'display_time_data'): self.display_time_data = deque(maxlen=self.max_points_to_keep) self.display_voltage_data = deque(maxlen=self.max_points_to_keep) self.display_current_data = deque(maxlen=self.max_points_to_keep) self.display_time_data.append(np.mean(agg_buffer['time'])) self.display_voltage_data.append(np.mean(agg_buffer['voltage'])) self.display_current_data.append(np.mean(agg_buffer['current'])) except: pass self.aggregation_buffer = { 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': time.time() } def adjust_downsampling(self): current_length = len(self.time_data) if current_length > self.max_points_to_keep * 1.5: # Exponentiell erhöhen, aber max. 64 new_factor = min(64, max(1, self.downsample_factor * 2)) elif current_length < self.max_points_to_keep // 2: # Halbieren, aber min. 1 new_factor = max(1, self.downsample_factor // 2) else: return if new_factor != self.downsample_factor: self.downsample_factor = new_factor self.status_bar.showMessage( f"Downsampling: Factor {self.downsample_factor}", 2000) def update_status_and_plot(self): """Combined status and plot update""" self.update_status() self.update_plot() def update_status(self): """Update status information periodically""" now = time.time() # Define 'now' at the start of the method if self.test_running or hasattr(self, 'record_button') and self.record_button.isChecked(): if self.time_data: current_time = self.time_data[-1] if len(self.time_data) > 1: delta_t = self.time_data[-1] - self.time_data[-2] if delta_t > 0: current_current = abs(self.current_data[-1]) self.capacity_ah += current_current * delta_t / 3600 self.capacity_label.setText(f"{self.capacity_ah:.4f}") # Logging (1x per second) if (hasattr(self, 'log_writer') and hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None and not self.current_cycle_file.closed): # Initialize last log time if not exists if not hasattr(self, '_last_log_time'): self._last_log_time = now if self.time_data and (now - self._last_log_time >= 1.0): try: current_time = self.time_data[-1] voltage = self.voltage_data[-1] current = self.current_data[-1] if self.current_mode == "Cycle Test": self.log_writer.writerow([ f"{current_time:.4f}", f"{voltage:.6f}", f"{current:.6f}", self.test_phase, f"{self.capacity_ah:.4f}", f"{self.charge_capacity:.4f}", f"{self.coulomb_efficiency:.1f}", f"{self.cycle_count}" ]) else: self.log_writer.writerow([ f"{current_time:.4f}", f"{voltage:.6f}", f"{current:.6f}", self.test_phase if hasattr(self, 'test_phase') else "Live", f"{self.capacity_ah:.4f}", f"{voltage * current:.4f}", # Power f"{self.energy:.4f}", # Energy f"{self.cycle_count}" if hasattr(self, 'cycle_count') else "1" ]) self.current_cycle_file.flush() self._last_log_time = now except Exception as e: print(f"Error writing to log file: {e}") if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None: try: self.current_cycle_file.close() except: pass self.record_button.setChecked(False) self.current_cycle_file = None def start_test(self): """Start the selected test mode""" if self.current_mode == "Cycle Test": self.start_cycle_test() elif self.current_mode == "Discharge Test": self.start_discharge_test() elif self.current_mode == "Charge Test": self.start_charge_test() elif self.current_mode == "Live Monitoring": self.start_live_monitoring() def start_cycle_test(self): """Start the battery cycle test""" # Clean up any previous test if hasattr(self, 'test_sequence_worker'): try: self.test_sequence_worker.stop() except: pass self.test_sequence_worker.deleteLater() if hasattr(self, 'test_sequence_thread'): self.test_sequence_thread.quit() self.test_sequence_thread.wait() self.test_sequence_thread.deleteLater() del self.test_sequence_thread self.reset_test() self.reset_plot() if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() # Reset stop flag self.request_stop = False if not self.test_running: try: # Get parameters from UI self.capacity = float(self.capacity_input.text()) self.charge_cutoff = float(self.charge_cutoff_input.text()) self.discharge_cutoff = float(self.discharge_cutoff_input.text()) self.rest_time = float(self.rest_time_input.text()) self.c_rate = float(self.c_rate_input.text()) # Validate inputs if self.capacity <= 0: raise ValueError("Battery capacity must be positive") if self.charge_cutoff <= self.discharge_cutoff: raise ValueError("Charge cutoff must be higher than discharge cutoff") if self.c_rate <= 0: raise ValueError("C-rate must be positive") test_current = self.c_rate * self.capacity if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") # Clear ALL previous data completely with self.plot_mutex: self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.phase_data.clear() # Reset capacities and timing self.start_time = time.time() self.last_update_time = self.start_time self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 self.energy = 0.0 # Reset measurement thread's timer and queues if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() self.measurement_thread.voltage_window.clear() self.measurement_thread.current_window.clear() with self.measurement_thread.measurement_queue.mutex: self.measurement_thread.measurement_queue.queue.clear() # Reset plot completely self.reset_plot() # Start test self.test_running = True self.start_time = time.time() self.last_update_time = time.time() self.test_phase = "Initial Discharge" self.phase_label.setText(self.test_phase) self.start_button.setEnabled(False) self.stop_button.setEnabled(True) self.status_bar.showMessage(f"Cycle test started | Current: {test_current:.4f}A") # Create log file self.create_cycle_log_file() # Start test sequence in a QThread self.test_sequence_thread = QThread() self.test_sequence_worker = TestSequenceWorker( self.dev, test_current, self.charge_cutoff, self.discharge_cutoff, self.rest_time, self.continuous_mode_check.isChecked(), self # Pass reference to main window for callbacks ) self.test_sequence_worker.moveToThread(self.test_sequence_thread) # Connect signals self.test_sequence_worker.update_phase.connect(self.update_test_phase) self.test_sequence_worker.update_status.connect(self.status_bar.showMessage) self.test_sequence_worker.test_completed.connect(self.finalize_test) self.test_sequence_worker.error_occurred.connect(self.handle_test_error) self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit) self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater) self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater) # Start the thread and the worker's run method self.test_sequence_thread.start() QTimer.singleShot(0, self.test_sequence_worker.run) except Exception as e: QMessageBox.critical(self, "Error", str(e)) # Ensure buttons are in correct state if error occurs self.start_button.setEnabled(True) self.stop_button.setEnabled(False) def start_discharge_test(self): """Start the battery discharge test""" # Clean up any previous test self.reset_test() # löscht time_data, voltage_data, current_data, display_*, phase_data self.reset_plot() if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() if hasattr(self, 'discharge_worker'): try: self.discharge_worker.stop() except: pass self.discharge_worker.deleteLater() if hasattr(self, 'discharge_thread'): self.discharge_thread.quit() self.discharge_thread.wait() # warte unbegrenzt, bis er wirklich fertig ist self.discharge_thread.deleteLater() del self.discharge_thread # Reset stop flag self.request_stop = False if not self.test_running: try: # Get parameters from UI self.capacity = float(self.capacity_input.text()) self.discharge_cutoff = float(self.discharge_cutoff_input.text()) self.c_rate = float(self.c_rate_input.text()) # Validate inputs if self.capacity <= 0: raise ValueError("Battery capacity must be positive") if self.c_rate <= 0: raise ValueError("C-rate must be positive") test_current = self.c_rate * self.capacity if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") # Clear ALL previous data completely with self.plot_mutex: self.time_data.clear() self.voltage_data.clear() self.current_data.clear() # Reset capacities and timing self.start_time = time.time() self.last_update_time = self.start_time self.capacity_ah = 0.0 self.energy = 0.0 self.cycle_count = 1 # Reset measurement thread's timer and queues if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() self.measurement_thread.voltage_window.clear() self.measurement_thread.current_window.clear() with self.measurement_thread.measurement_queue.mutex: self.measurement_thread.measurement_queue.queue.clear() # Reset plot completely self.reset_plot() # Start test self.test_running = True self.start_time = time.time() self.last_update_time = time.time() self.test_phase = "Discharge" self.phase_label.setText(self.test_phase) self.start_button.setEnabled(False) self.stop_button.setEnabled(True) self.status_bar.showMessage(f"Discharge started | Current: {test_current:.4f}A") # Create log file self.create_cycle_log_file() # Start discharge worker in a QThread self.discharge_thread = QThread() self.discharge_worker = DischargeWorker( self.dev, test_current, self.discharge_cutoff, self # Pass reference to main window for callbacks ) self.discharge_worker.moveToThread(self.discharge_thread) # Connect signals self.discharge_worker.update_status.connect(self.status_bar.showMessage) self.discharge_worker.test_completed.connect(self.finalize_test) self.discharge_worker.error_occurred.connect(self.handle_test_error) self.discharge_worker.finished.connect(self.discharge_thread.quit) self.discharge_worker.finished.connect(self.discharge_worker.deleteLater) self.discharge_thread.finished.connect(self.discharge_thread.deleteLater) # Start the thread and the worker's run method self.discharge_thread.start() QTimer.singleShot(0, self.discharge_worker.run) except Exception as e: QMessageBox.critical(self, "Error", str(e)) # Ensure buttons are in correct state if error occurs self.start_button.setEnabled(True) self.stop_button.setEnabled(False) def start_charge_test(self): """Start the battery charge test""" # Clean up any previous test if hasattr(self, 'charge_worker'): try: self.charge_worker.stop() except: pass self.charge_worker.deleteLater() if hasattr(self, 'charge_thread'): self.charge_thread.quit() self.charge_thread.wait() self.charge_thread.deleteLater() del self.charge_thread self.reset_test() self.reset_plot() if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() # Reset stop flag self.request_stop = False if not self.test_running: try: # Get parameters from UI self.capacity = float(self.capacity_input.text()) self.charge_cutoff = float(self.charge_cutoff_input.text()) self.c_rate = float(self.c_rate_input.text()) # Validate inputs if self.capacity <= 0: raise ValueError("Battery capacity must be positive") if self.c_rate <= 0: raise ValueError("C-rate must be positive") test_current = self.c_rate * self.capacity if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") # Clear ALL previous data completely with self.plot_mutex: self.time_data.clear() self.voltage_data.clear() self.current_data.clear() # Reset capacities and timing self.start_time = time.time() self.last_update_time = self.start_time self.capacity_ah = 0.0 self.energy = 0.0 self.cycle_count = 1 # Reset measurement thread if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() self.measurement_thread.voltage_window.clear() self.measurement_thread.current_window.clear() with self.measurement_thread.measurement_queue.mutex: self.measurement_thread.measurement_queue.queue.clear() # Reset plot self.reset_plot() # Start test self.test_running = True self.start_time = time.time() self.last_update_time = time.time() self.test_phase = "Charge" self.phase_label.setText(self.test_phase) self.start_button.setEnabled(False) self.stop_button.setEnabled(True) self.status_bar.showMessage(f"Charge started @ {test_current:.3f}A to {self.charge_cutoff}V") # Create log file self.create_cycle_log_file() # Start charge worker in a QThread self.charge_thread = QThread() self.charge_worker = ChargeWorker( self.dev, test_current, self.charge_cutoff, self ) self.charge_worker.moveToThread(self.charge_thread) # Connect signals self.charge_worker.update_status.connect(self.status_bar.showMessage) self.charge_worker.test_completed.connect(self.finalize_test) self.charge_worker.error_occurred.connect(self.handle_test_error) self.charge_worker.finished.connect(self.charge_thread.quit) self.charge_worker.finished.connect(self.charge_worker.deleteLater) self.charge_thread.finished.connect(self.charge_thread.deleteLater) # Start the thread self.charge_thread.start() QTimer.singleShot(0, self.charge_worker.run) except Exception as e: QMessageBox.critical(self, "Error", str(e)) self.start_button.setEnabled(True) self.stop_button.setEnabled(False) def start_live_monitoring(self): """Start live monitoring mode""" try: # Reset everything completely self.reset_test() # Reset measurement timing if hasattr(self, 'measurement_thread'): self.measurement_thread.start_time = time.time() # Set monitoring flags self.test_running = True self.test_phase = "Live Monitoring" self.phase_label.setText(self.test_phase) # Update UI self.stop_button.setEnabled(True) self.start_button.setEnabled(False) # Configure device for monitoring if hasattr(self, 'dev'): try: self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].mode = pysmu.Mode.HI_Z except Exception as e: print(f"Error configuring device for monitoring: {e}") self.status_bar.showMessage("Live monitoring started") except Exception as e: print(f"Error starting live monitoring: {e}") self.test_running = False QMessageBox.critical(self, "Error", f"Failed to start monitoring:\n{str(e)}") def create_cycle_log_file(self): """Create a new log file for the current test""" try: self._last_log_time = time.time() # Close previous file if exists if hasattr(self, 'current_cycle_file') and self.current_cycle_file: try: self.current_cycle_file.close() except Exception as e: print(f"Error closing previous log file: {e}") # Ensure log directory exists os.makedirs(self.log_dir, exist_ok=True) if not os.access(self.log_dir, os.W_OK): QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}") return False # Generate filename based on mode timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if self.current_mode == "Cycle Test": self.filename = os.path.join(self.log_dir, f"battery_cycle_{timestamp}.csv") elif self.current_mode == "Discharge Test": self.filename = os.path.join(self.log_dir, f"battery_discharge_{timestamp}.csv") else: # Live Monitoring self.filename = os.path.join(self.log_dir, f"battery_live_{timestamp}.csv") # Open new file try: self.current_cycle_file = open(self.filename, 'w', newline='') # Write header with test parameters test_current = self.c_rate * self.capacity test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A" self.current_cycle_file.write(f"# ADALM1000 Battery Test Log - {self.current_mode}\n") self.current_cycle_file.write(f"# Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") self.current_cycle_file.write(f"# Battery Capacity: {self.capacity} Ah\n") if self.current_mode != "Live Monitoring": self.current_cycle_file.write(f"# Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n") if self.current_mode == "Cycle Test": self.current_cycle_file.write(f"# Charge Cutoff: {self.charge_cutoff} V\n") self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n") self.current_cycle_file.write(f"# Rest Time: {self.rest_time} hours\n") elif self.current_mode == "Discharge Test": self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n") self.current_cycle_file.write(f"# Test Conditions/Chemistry: {test_conditions}\n") self.current_cycle_file.write("#\n") # Write data header self.log_writer = csv.writer(self.current_cycle_file) if self.current_mode == "Cycle Test": self.log_writer.writerow([ "Time(s)", "Voltage(V)", "Current(A)", "Phase", "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", "Coulomb_Eff(%)", "Cycle" ]) else: self.log_writer.writerow([ "Time(s)", "Voltage(V)", "Current(A)", "Phase", "Capacity(Ah)", "Power(W)", "Energy(Wh)", "Cycle" ]) return True except Exception as e: QMessageBox.critical(self, "Error", f"Failed to create log file: {e}") return False except Exception as e: print(f"Error in create_cycle_log_file: {e}") return False def finalize_log_file(self): """Finalize the current log file""" if hasattr(self, 'current_cycle_file') and self.current_cycle_file: try: test_current = self.c_rate * self.capacity test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A" self.current_cycle_file.write("\n# TEST SUMMARY\n") self.current_cycle_file.write(f"# Test Parameters:\n") self.current_cycle_file.write(f"# - Battery Capacity: {self.capacity} Ah\n") if self.current_mode != "Live Monitoring": self.current_cycle_file.write(f"# - Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n") if self.current_mode == "Cycle Test": self.current_cycle_file.write(f"# - Charge Cutoff: {self.charge_cutoff} V\n") self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n") self.current_cycle_file.write(f"# - Rest Time: {self.rest_time} hours\n") elif self.current_mode == "Discharge Test": self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n") self.current_cycle_file.write(f"# - Test Conditions: {test_conditions}\n") self.current_cycle_file.write(f"# Results:\n") if self.current_mode == "Cycle Test": self.current_cycle_file.write(f"# - Cycles Completed: {self.cycle_count}\n") self.current_cycle_file.write(f"# - Final Discharge Capacity: {self.capacity_ah:.4f} Ah\n") self.current_cycle_file.write(f"# - Final Charge Capacity: {self.charge_capacity:.4f} Ah\n") self.current_cycle_file.write(f"# - Coulombic Efficiency: {self.coulomb_efficiency:.1f}%\n") else: self.current_cycle_file.write(f"# - Capacity: {self.capacity_ah:.4f} Ah\n") self.current_cycle_file.write(f"# - Energy: {self.energy:.4f} Wh\n") self.current_cycle_file.close() except Exception as e: print(f"Error closing log file: {e}") finally: self.current_cycle_file = None def format_time(self, seconds): """Convert seconds to hh:mm:ss format""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def stop_test(self): """Request immediate stop of the current test or monitoring""" if not self.test_running and not (hasattr(self, 'record_button') and self.record_button.isChecked()): return self.request_stop = True self.test_running = False self.measuring = False # Stop any active test threads if hasattr(self, 'test_sequence_worker'): try: if not sip.isdeleted(self.test_sequence_worker): self.test_sequence_worker.stop() except: pass if hasattr(self, 'discharge_worker'): try: if not sip.isdeleted(self.discharge_worker): self.discharge_worker.stop() except: pass # Stop recording if active if hasattr(self, 'record_button') and self.record_button.isChecked(): self.record_button.setChecked(False) if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None: self.finalize_log_file() self.record_button.setText("Start Recording") # Reset device to safe state if hasattr(self, 'dev'): try: self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].mode = pysmu.Mode.HI_Z except Exception as e: print(f"Error resetting device: {e}") # Clear all data buffers with self.plot_mutex: self.time_data.clear() self.voltage_data.clear() self.current_data.clear() if hasattr(self, 'phase_data'): self.phase_data.clear() # Reset measurements self.capacity_ah = 0.0 self.energy = 0.0 if hasattr(self, 'charge_capacity'): self.charge_capacity = 0.0 if hasattr(self, 'coulomb_efficiency'): self.coulomb_efficiency = 0.0 # Reset plot self.reset_plot() # Update UI self.test_phase = "Idle" self.phase_label.setText(self.test_phase) self.stop_button.setEnabled(False) self.start_button.setEnabled(True) if self.current_mode == "Live Monitoring": self.status_bar.showMessage("Live monitoring stopped") else: self.status_bar.showMessage("Test stopped - Ready for new test") def finalize_test(self): """Final cleanup after test completes or is stopped""" try: # 1. Stop any active measurement or test operations self.measuring = False self.test_running = False # 2. Reset device to safe state if hasattr(self, 'dev'): try: self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].mode = pysmu.Mode.HI_Z except Exception as e: print(f"Error resetting device in finalize: {e}") # 3. Clean up test sequence thread safely if hasattr(self, 'test_sequence_thread'): try: if self.test_sequence_thread.isRunning(): if hasattr(self, 'test_sequence_worker'): try: self.test_sequence_worker.stop() except RuntimeError: pass self.test_sequence_thread.quit() self.test_sequence_thread.wait(500) except Exception as e: print(f"Error stopping test sequence thread: {e}") finally: if hasattr(self, 'test_sequence_worker'): try: if not sip.isdeleted(self.test_sequence_worker): self.test_sequence_worker.deleteLater() except: pass if hasattr(self, 'test_sequence_thread'): try: if not sip.isdeleted(self.test_sequence_thread): self.test_sequence_thread.deleteLater() except: pass finally: if hasattr(self, 'test_sequence_thread'): del self.test_sequence_thread # 4. Clean up discharge thread safely if hasattr(self, 'discharge_thread'): try: if self.discharge_thread.isRunning(): if hasattr(self, 'discharge_worker'): try: self.discharge_worker.stop() except RuntimeError: pass self.discharge_thread.quit() self.discharge_thread.wait(500) except Exception as e: print(f"Error stopping discharge thread: {e}") finally: if hasattr(self, 'discharge_worker'): try: if not sip.isdeleted(self.discharge_worker): self.discharge_worker.deleteLater() except: pass if hasattr(self, 'discharge_thread'): try: if not sip.isdeleted(self.discharge_thread): self.discharge_thread.deleteLater() except: pass finally: if hasattr(self, 'discharge_thread'): del self.discharge_thread # 5. Clean up charge thread safely (using same pattern as discharge thread) if hasattr(self, 'charge_thread'): try: if self.charge_thread.isRunning(): if hasattr(self, 'charge_worker'): try: self.charge_worker.stop() except RuntimeError: pass self.charge_thread.quit() self.charge_thread.wait(500) except Exception as e: print(f"Error stopping charge thread: {e}") finally: if hasattr(self, 'charge_worker'): try: if not sip.isdeleted(self.charge_worker): self.charge_worker.deleteLater() except: pass if hasattr(self, 'charge_thread'): try: if not sip.isdeleted(self.charge_thread): self.charge_thread.deleteLater() except: pass finally: if hasattr(self, 'charge_thread'): del self.charge_thread # 6. Finalize log file self.finalize_log_file() # 7. Reset UI and state self.request_stop = False self.start_button.setEnabled(True) self.stop_button.setEnabled(False) # 8. Show completion message if test wasn't stopped by user if not self.request_stop: test_current = self.c_rate * self.capacity test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A" if self.current_mode == "Cycle Test": message = ( f"Cycle test completed | " f"Cycle {self.cycle_count} | " f"Capacity: {self.capacity_ah:.4f}Ah | " f"Efficiency: {self.coulomb_efficiency:.1f}%" ) QMessageBox.information( self, "Test Completed", f"Cycle test completed successfully.\n\n" f"Test Parameters:\n" f"- Capacity: {self.capacity} Ah\n" f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n" f"- Charge Cutoff: {self.charge_cutoff} V\n" f"- Discharge Cutoff: {self.discharge_cutoff} V\n" f"- Conditions: {test_conditions}\n\n" f"Results:\n" f"- Cycles: {self.cycle_count}\n" f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n" f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%" ) elif self.current_mode == "Discharge Test": message = ( f"Discharge completed | " f"Capacity: {self.capacity_ah:.4f}Ah | " f"Energy: {self.energy:.4f}Wh" ) QMessageBox.information( self, "Discharge Completed", f"Discharge test completed successfully.\n\n" f"Test Parameters:\n" f"- Capacity: {self.capacity} Ah\n" f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n" f"- Discharge Cutoff: {self.discharge_cutoff} V\n" f"- Conditions: {test_conditions}\n\n" f"Results:\n" f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n" f"- Energy delivered: {self.energy:.4f}Wh" ) self.status_bar.showMessage(message) except Exception as e: print(f"Error in finalize_test: {e}") import traceback traceback.print_exc() # Ensure we don't leave the UI in a locked state self.start_button.setEnabled(True) self.stop_button.setEnabled(False) self.status_bar.showMessage("Error during test finalization") def reset_plot(self): """Completely reset the plot - clears all data and visuals""" # Clear line data self.line_voltage.set_data([], []) self.line_current.set_data([], []) # Reset axes with appropriate ranges voltage_padding = 0.2 min_voltage = 0 max_voltage = 5.0 # Max voltage for ADALM1000 self.ax.set_xlim(0, 10) # Reset X axis self.ax.set_ylim(min_voltage, max_voltage) self.ax.set_xlabel('Time (s)', color=self.fg_color) self.ax.set_ylabel("Voltage (V)", color='#00BFFF') self.ax.set_title('Battery Test', color=self.fg_color) self.ax.tick_params(axis='x', colors=self.fg_color) self.ax.tick_params(axis='y', labelcolor='#00BFFF') self.ax.grid(True, color='#4C566A') # Reset twin axis (current) current_padding = 0.05 self.ax2.set_xlim(0, 10) self.ax2.set_ylim(-0.25 - current_padding, 0.25 + current_padding) self.ax2.set_ylabel("Current (A)", color='r') self.ax2.tick_params(axis='y', labelcolor='r') # Redraw legends self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) # Force immediate redraw self.canvas.draw() def update_status_and_plot(self): """Combined status and plot update""" self.update_status() self.update_plot() def update_plot(self): """More robust plotting with error handling""" try: # Create local copies of data safely with self.plot_mutex: if not self.display_time_data: return x_data = np.array(self.display_time_data) y1_data = np.array(self.display_voltage_data) y2_data = np.array(self.display_current_data) # Update plot data self.line_voltage.set_data(x_data, y1_data) self.line_current.set_data(x_data, y2_data) # Auto-scale when needed if len(x_data) > 1: self.auto_scale_axes() # Force redraw self.canvas.draw_idle() except Exception as e: print(f"Plot error: {e}") # Attempt to recover self.reset_plot() def auto_scale_axes(self): """Auto-scale plot axes with appropriate padding and strict boundaries""" if not self.time_data: return min_time = 0 max_time = self.time_data[-1] current_xlim = self.ax.get_xlim() if max_time > current_xlim[1] * 0.95: new_max = max_time * 1.05 self.ax.set_xlim(min_time, new_max) self.ax2.set_xlim(min_time, new_max) voltage_padding = 0.2 if self.voltage_data: min_voltage = max(0, min(self.voltage_data) - voltage_padding) max_voltage = min(5.0, max(self.voltage_data) + voltage_padding) current_ylim = self.ax.get_ylim() if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1): self.ax.set_ylim(min_voltage, max_voltage) current_padding = 0.05 if self.current_data: min_current = max(-0.25, min(self.current_data) - current_padding) max_current = min(0.25, max(self.current_data) + current_padding) current_ylim2 = self.ax2.get_ylim() if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02): self.ax2.set_ylim(min_current, max_current) @pyqtSlot(str) def handle_device_error(self, error): """Handle device connection errors""" error_msg = str(error) print(f"Device error: {error_msg}") if hasattr(self, 'session'): try: if self.session_active: self.session.end() del self.session except Exception as e: print(f"Error cleaning up session: {e}") self.status_light.setStyleSheet(f"background-color: red; border-radius: 10px;") self.connection_label.setText("Disconnected") self.status_bar.showMessage(f"Device error: {error_msg}") self.session_active = False self.test_running = False self.continuous_mode = False self.measuring = False self.start_button.setEnabled(False) self.stop_button.setEnabled(False) self.time_data.clear() self.voltage_data.clear() self.current_data.clear() @pyqtSlot(str) def update_test_phase(self, phase_text): """Update the test phase display""" self.test_phase = phase_text self.phase_label.setText(phase_text) @pyqtSlot(str) def handle_test_error(self, error_msg): """Handle errors from the test sequence with complete cleanup""" try: # 1. Notify user QMessageBox.critical(self, "Test Error", f"An error occurred:\n{error_msg}\n\nAttempting to recover...") # 2. Stop all operations self.stop_test() # 3. Reset UI elements if hasattr(self, 'line_voltage'): try: self.line_voltage.set_data([], []) self.line_current.set_data([], []) self.ax.set_xlim(0, 1) self.ax2.set_xlim(0, 1) self.canvas.draw() except Exception as plot_error: print(f"Plot reset error: {plot_error}") # 4. Update status self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...") self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;") # 5. Attempt recovery QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect except Exception as e: print(f"Error in error handler: {e}") # Fallback - restart application? QMessageBox.critical(self, "Fatal Error", "The application needs to restart due to an unrecoverable error") QTimer.singleShot(1000, self.close) def attempt_reconnect(self): """Attempt to reconnect automatically""" QMessageBox.critical( self, "Device Connection Error", "Could not connect to ADALM1000\n\n" "1. Check USB cable connection\n" "2. The device will attempt to reconnect automatically" ) QTimer.singleShot(1000, self.reconnect_device) def reconnect_device(self): """Robust reconnection handler with device persistence""" self.status_bar.showMessage("Reconnecting...") # Remember current selection current_serial = self.dev.serial if hasattr(self, 'dev') else None # Cleanup existing connection if hasattr(self, 'session'): try: self.session.end() except Exception as e: print(f"Error ending session: {e}") if hasattr(self, 'measurement_thread'): self.measurement_thread.stop() self.measurement_thread.wait(500) time.sleep(1) # Allow for USB reinitialization try: # Reinitialize session self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) if not self.session.devices: raise DeviceDisconnectedError("No devices available") # Repopulate device list self.device_combo.clear() for dev in self.session.devices: self.device_combo.addItem(dev.serial) # Try to reselect previous device target_index = 0 if current_serial: for i, dev in enumerate(self.session.devices): if dev.serial == current_serial: target_index = i break self.dev = self.session.devices[target_index] self.device_combo.setCurrentIndex(target_index) # Configure and start self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].constant(0) device_index = self.session.devices.index(self.dev) self.session.start(device_index) # Update UI self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") self.connection_label.setText(f"Reconnected: {self.dev.serial}") self.status_bar.showMessage(f"Device {self.dev.serial} ready") self.session_active = True # Restart measurement self.measurement_thread = MeasurementThread(self.dev, self.interval) self.measurement_thread.update_signal.connect(self.update_measurements) self.measurement_thread.error_signal.connect(self.handle_device_error) self.measurement_thread.start() except Exception as e: self.status_bar.showMessage("Reconnect failed - retrying...") QTimer.singleShot(2000, self.reconnect_device) def closeEvent(self, event): """Clean up on window close""" self.test_running = False self.measuring = False self.session_active = False # Stop measurement thread if hasattr(self, 'measurement_thread'): self.measurement_thread.stop() # Stop test sequence thread if hasattr(self, 'test_sequence_thread'): if hasattr(self, 'test_sequence_worker'): self.test_sequence_worker.stop() self.test_sequence_thread.quit() self.test_sequence_thread.wait(500) # Stop discharge thread if hasattr(self, 'discharge_thread'): if hasattr(self, 'discharge_worker'): self.discharge_worker.stop() self.discharge_thread.quit() self.discharge_thread.wait(500) # Clean up device session if hasattr(self, 'session') and self.session: try: self.session.end() except Exception as e: print(f"Error ending session: {e}") event.accept() if __name__ == "__main__": app = QApplication([]) try: window = BatteryTester() window.show() app.exec_() except Exception as e: QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}")