From 418885aca8df8eb3479a0895cbbccf6f237f9c01 Mon Sep 17 00:00:00 2001 From: Jan Date: Thu, 3 Jul 2025 19:07:53 +0200 Subject: [PATCH] MainCode/adalm1000_logger.py aktualisiert Working fine (D) --- MainCode/adalm1000_logger.py | 1164 ++++++++++++++++++++++++++++++++-- 1 file changed, 1105 insertions(+), 59 deletions(-) diff --git a/MainCode/adalm1000_logger.py b/MainCode/adalm1000_logger.py index cee0173..e2c78ff 100644 --- a/MainCode/adalm1000_logger.py +++ b/MainCode/adalm1000_logger.py @@ -1,69 +1,1115 @@ -def charge_phase(self): - """Handle the battery charging phase""" - self.update_phase.emit("Charge") - self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.3f}A") - - try: - # Configure both channels properly - self.device.channels['B'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].mode = pysmu.Mode.SIMV - self.device.channels['A'].sink = False # Ensure sourcing current - self.device.channels['A'].constant(self.test_current) - - # Small delay to allow current to stabilize - time.sleep(0.1) - +# -*- coding: utf-8 -*- +import os +import time +import csv +import threading +from datetime import datetime +import numpy as np +import matplotlib +matplotlib.use('Qt5Agg') +from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas +from matplotlib.figure import Figure +from collections import deque +from queue import Queue, Full, Empty + +from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, + QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog) +from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread +import pysmu + +class DeviceDisconnectedError(Exception): + pass + +class MeasurementThread(QThread): + update_signal = pyqtSignal(float, float, float) + error_signal = pyqtSignal(str) + + def __init__(self, device, interval=0.1): + super().__init__() + self.device = device + self.interval = interval + self._running = False + self.filter_window_size = 10 + self.voltage_window = [] + self.current_window = [] + self.start_time = time.time() + self.measurement_queue = Queue(maxsize=1) + + def run(self): + self._running = True while self._running: - voltage, current = self.get_latest_measurement() - if voltage is None: + try: + samples = self.device.read(self.filter_window_size, 500, True) + if not samples: + raise DeviceDisconnectedError("No samples received") + + current_time = time.time() - self.start_time + + # Get voltage from Channel B (HI_Z mode) and current from Channel A + raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage + raw_current = np.mean([s[0][1] for s in samples]) # Channel A current + + # Update filter windows + self.voltage_window.append(raw_voltage) + self.current_window.append(raw_current) + + if len(self.voltage_window) > self.filter_window_size: + self.voltage_window.pop(0) + self.current_window.pop(0) + + voltage = np.mean(self.voltage_window) + current = np.mean(self.current_window) + + # Emit update + self.update_signal.emit(voltage, current, current_time) + + # Store measurement + try: + self.measurement_queue.put_nowait((voltage, current)) + except Full: + pass + + time.sleep(max(0.05, self.interval)) + + except Exception as e: + self.error_signal.emit(f"Read error: {str(e)}") + time.sleep(1) continue - - # Update parent's data for logging/display - with self.parent.plot_mutex: - if len(self.parent.voltage_data) > 0: - self.parent.voltage_data[-1] = voltage - self.parent.current_data[-1] = current + + def stop(self): + self._running = False + self.wait(500) + +class TestSequenceWorker(QObject): + finished = pyqtSignal() + update_phase = pyqtSignal(str) + update_status = pyqtSignal(str) + test_completed = pyqtSignal() + error_occurred = pyqtSignal(str) + + def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent): + super().__init__() + self.device = device + self.test_current = test_current + self.charge_cutoff = charge_cutoff + self.discharge_cutoff = discharge_cutoff + self.rest_time = rest_time * 3600 # Convert hours to seconds + self.continuous_mode = continuous_mode + self.parent = parent + self._running = True + self.voltage_timeout = 0.5 # seconds + + def get_latest_measurement(self): + """Thread-safe measurement reading with timeout""" + try: + return self.parent.measurement_thread.measurement_queue.get( + timeout=self.voltage_timeout + ) + except Empty: + return (None, None) # Return tuple for unpacking + + def charge_phase(self): + """Handle the battery charging phase""" + self.update_phase.emit("Charge") + self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.3f}A") + + try: + # Configure channels - Channel A sources current, Channel B measures voltage + self.device.channels['B'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].mode = pysmu.Mode.SIMV + self.device.channels['A'].constant(self.test_current) - if voltage >= self.charge_cutoff: - break - + # Small delay to allow current to stabilize time.sleep(0.1) - finally: + while self._running: + voltage, current = self.get_latest_measurement() + if voltage is None: + continue + + # Update parent's data for logging/display + with self.parent.plot_mutex: + if len(self.parent.voltage_data) > 0: + self.parent.voltage_data[-1] = voltage + self.parent.current_data[-1] = current + + if voltage >= self.charge_cutoff: + break + + time.sleep(0.1) + + finally: + self.device.channels['A'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].constant(0) + + def discharge_phase(self): + """Handle the battery discharging phase""" + self.update_phase.emit("Discharge") + self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.3f}A") + + try: + # Configure channels - Channel A sinks current, Channel B measures voltage + self.device.channels['B'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].mode = pysmu.Mode.SIMV + self.device.channels['A'].constant(-self.test_current) + + # Small delay to allow current to stabilize + time.sleep(0.1) + + while self._running: + voltage, current = self.get_latest_measurement() + if voltage is None: + continue + + # Update parent's data for logging/display + with self.parent.plot_mutex: + if len(self.parent.voltage_data) > 0: + self.parent.voltage_data[-1] = voltage + self.parent.current_data[-1] = current + + if voltage <= self.discharge_cutoff: + break + + time.sleep(0.1) + + finally: + self.device.channels['A'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].constant(0) + + def rest_phase(self, phase_name): + """Handle rest period between phases""" + self.update_phase.emit(f"Resting ({phase_name})") + rest_end = time.time() + self.rest_time + + while time.time() < rest_end and self._running: + time_left = max(0, rest_end - time.time()) + self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min") + time.sleep(1) + + def stop(self): + """Request the thread to stop""" + self._running = False self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) -def discharge_phase(self): - """Handle the battery discharging phase""" - self.update_phase.emit("Discharge") - self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.3f}A") + def run(self): + """Main test sequence loop""" + try: + while self._running and (self.continuous_mode or self.parent.cycle_count == 0): + # Reset stop request at start of each cycle + self.parent.request_stop = False + self.parent.cycle_count += 1 + + # 1. Charge phase (constant current) + self.charge_phase() + if not self._running or self.parent.request_stop: + break + + # 2. Rest period after charge + self.rest_phase("Post-Charge") + if not self._running or self.parent.request_stop: + break + + # 3. Discharge phase (capacity measurement) + self.discharge_phase() + if not self._running or self.parent.request_stop: + break + + # 4. Rest period after discharge (only if not stopping) + if self._running and not self.parent.request_stop: + self.rest_phase("Post-Discharge") + + # Calculate Coulomb efficiency if not stopping + if not self.parent.request_stop and self.parent.charge_capacity > 0: + self.parent.coulomb_efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100 + + # Test completed + self.test_completed.emit() + + except Exception as e: + self.error_occurred.emit(f"Test sequence error: {str(e)}") + finally: + self.finished.emit() + +class BatteryTester(QMainWindow): + def __init__(self): + self.plot_mutex = threading.Lock() + super().__init__() + + # Color scheme + self.bg_color = "#2E3440" + self.fg_color = "#D8DEE9" + self.accent_color = "#5E81AC" + self.warning_color = "#BF616A" + self.success_color = "#A3BE8C" + + # Device and measurement state + self.session_active = False + self.measuring = False + self.test_running = False + self.continuous_mode = False + self.request_stop = False + self.interval = 0.1 + self.log_dir = os.path.expanduser("~/adalm1000/logs") + os.makedirs(self.log_dir, exist_ok=True) + + # Data buffers + self.time_data = deque() + self.voltage_data = deque() + self.current_data = deque() + self.phase_data = deque() + + # Initialize UI and device + self.setup_ui() + self.init_device() + + # Set window properties + self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)") + self.resize(1000, 800) + self.setMinimumSize(800, 700) + + # Status update timer + self.status_timer = QTimer() + self.status_timer.timeout.connect(self.update_status) + self.status_timer.start(1000) # Update every second + + def setup_ui(self): + """Configure the user interface""" + # Main widget and layout + self.central_widget = QWidget() + self.setCentralWidget(self.central_widget) + self.main_layout = QVBoxLayout(self.central_widget) + self.main_layout.setContentsMargins(10, 10, 10, 10) + + # Header area + header_frame = QFrame() + header_frame.setFrameShape(QFrame.NoFrame) + header_layout = QHBoxLayout(header_frame) + header_layout.setContentsMargins(0, 0, 0, 0) + + self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)") + self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};") + header_layout.addWidget(self.title_label, 1) + + # Status indicator + self.status_light = QLabel() + self.status_light.setFixedSize(20, 20) + self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") + header_layout.addWidget(self.status_light) + + self.connection_label = QLabel("Disconnected") + header_layout.addWidget(self.connection_label) + + # Reconnect button + self.reconnect_btn = QPushButton("Reconnect") + self.reconnect_btn.clicked.connect(self.reconnect_device) + header_layout.addWidget(self.reconnect_btn) + + self.main_layout.addWidget(header_frame) + + # Measurement display + display_frame = QFrame() + display_frame.setFrameShape(QFrame.StyledPanel) + display_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") + display_layout = QGridLayout(display_frame) + + # Measurement values + measurement_labels = [ + ("Voltage (V)", "V"), + ("Current (A)", "A"), + ("Test Phase", ""), + ("Elapsed Time", "s"), + ("Discharge Capacity", "Ah"), + ("Charge Capacity", "Ah"), + ("Coulomb Eff.", "%"), + ("Cycle Count", ""), + ] + + self.value_labels = {} + for i, (label, unit) in enumerate(measurement_labels): + row = i // 2 + col = (i % 2) * 2 + + lbl = QLabel(f"{label}:") + lbl.setStyleSheet(f"color: {self.fg_color};") + display_layout.addWidget(lbl, row, col) + + value_lbl = QLabel("0.000") + value_lbl.setStyleSheet(f"color: {self.fg_color}; font-weight: bold;") + display_layout.addWidget(value_lbl, row, col + 1) + + if unit: + unit_lbl = QLabel(unit) + unit_lbl.setStyleSheet(f"color: {self.fg_color};") + display_layout.addWidget(unit_lbl, row, col + 2) + + # Store references to important labels + if i == 0: + self.voltage_label = value_lbl + elif i == 1: + self.current_label = value_lbl + elif i == 2: + self.phase_label = value_lbl + elif i == 3: + self.time_label = value_lbl + elif i == 4: + self.capacity_label = value_lbl + elif i == 5: + self.charge_capacity_label = value_lbl + elif i == 6: + self.efficiency_label = value_lbl + elif i == 7: + self.cycle_label = value_lbl + + self.main_layout.addWidget(display_frame) + + # Control area + controls_frame = QFrame() + controls_frame.setFrameShape(QFrame.NoFrame) + controls_layout = QHBoxLayout(controls_frame) + controls_layout.setContentsMargins(0, 0, 0, 0) + + # Parameters frame + params_frame = QFrame() + params_frame.setFrameShape(QFrame.StyledPanel) + params_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") + params_layout = QGridLayout(params_frame) + + # Battery capacity + self.capacity = 0.2 + self.capacity_label_input = QLabel("Battery Capacity (Ah):") + self.capacity_label_input.setStyleSheet(f"color: {self.fg_color};") + params_layout.addWidget(self.capacity_label_input, 0, 0) + self.capacity_input = QLineEdit("0.2") + self.capacity_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") + self.capacity_input.setFixedWidth(60) + params_layout.addWidget(self.capacity_input, 0, 1) + + # Charge cutoff + self.charge_cutoff = 1.43 + self.charge_cutoff_label = QLabel("Charge Cutoff (V):") + self.charge_cutoff_label.setStyleSheet(f"color: {self.fg_color};") + params_layout.addWidget(self.charge_cutoff_label, 1, 0) + self.charge_cutoff_input = QLineEdit("1.43") + self.charge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") + self.charge_cutoff_input.setFixedWidth(60) + params_layout.addWidget(self.charge_cutoff_input, 1, 1) + + # Discharge cutoff + self.discharge_cutoff = 0.9 + self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):") + self.discharge_cutoff_label.setStyleSheet(f"color: {self.fg_color};") + params_layout.addWidget(self.discharge_cutoff_label, 2, 0) + self.discharge_cutoff_input = QLineEdit("0.9") + self.discharge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") + self.discharge_cutoff_input.setFixedWidth(60) + params_layout.addWidget(self.discharge_cutoff_input, 2, 1) + + # Rest time + self.rest_time = 0.25 + self.rest_time_label = QLabel("Rest Time (hours):") + self.rest_time_label.setStyleSheet(f"color: {self.fg_color};") + params_layout.addWidget(self.rest_time_label, 3, 0) + self.rest_time_input = QLineEdit("0.25") + self.rest_time_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") + self.rest_time_input.setFixedWidth(60) + params_layout.addWidget(self.rest_time_input, 3, 1) + + # C-rate for test + self.c_rate = 0.1 + self.c_rate_label = QLabel("Test C-rate:") + self.c_rate_label.setStyleSheet(f"color: {self.fg_color};") + params_layout.addWidget(self.c_rate_label, 0, 2) + self.c_rate_input = QLineEdit("0.1") + self.c_rate_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") + self.c_rate_input.setFixedWidth(40) + params_layout.addWidget(self.c_rate_input, 0, 3) + + c_rate_note = QLabel("(e.g., 0.2 for C/5)") + c_rate_note.setStyleSheet(f"color: {self.fg_color};") + params_layout.addWidget(c_rate_note, 0, 4) + + controls_layout.addWidget(params_frame, 1) + + # Button frame + button_frame = QFrame() + button_frame.setFrameShape(QFrame.NoFrame) + button_layout = QVBoxLayout(button_frame) + button_layout.setContentsMargins(0, 0, 0, 0) + + self.start_button = QPushButton("START TEST") + self.start_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.accent_color}; + color: {self.fg_color}; + font-weight: bold; + padding: 6px; + border-radius: 4px; + }} + QPushButton:disabled {{ + background-color: #4C566A; + color: #D8DEE9; + }} + """) + self.start_button.clicked.connect(self.start_test) + button_layout.addWidget(self.start_button) + + self.stop_button = QPushButton("STOP TEST") + self.stop_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.warning_color}; + color: {self.fg_color}; + font-weight: bold; + padding: 6px; + border-radius: 4px; + }} + QPushButton:disabled {{ + background-color: #4C566A; + color: #D8DEE9; + }} + """) + self.stop_button.clicked.connect(self.stop_test) + self.stop_button.setEnabled(False) + button_layout.addWidget(self.stop_button) + + # Continuous mode checkbox + self.continuous_mode_check = QCheckBox("Continuous Mode") + self.continuous_mode_check.setChecked(True) + self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};") + button_layout.addWidget(self.continuous_mode_check) + + controls_layout.addWidget(button_frame) + self.main_layout.addWidget(controls_frame) + + # Plot area + self.setup_plot() + + # Status bar + self.status_bar = self.statusBar() + self.status_bar.setStyleSheet(f"color: {self.fg_color};") + self.status_bar.showMessage("Ready") + + # Apply dark theme + self.setStyleSheet(f""" + QMainWindow {{ + background-color: {self.bg_color}; + }} + QLabel {{ + color: {self.fg_color}; + }} + QLineEdit {{ + background-color: #3B4252; + color: {self.fg_color}; + border: 1px solid #4C566A; + border-radius: 3px; + padding: 2px; + }} + """) + + def setup_plot(self): + """Configure the matplotlib plot""" + self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color) + self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) + self.ax = self.fig.add_subplot(111) + self.ax.set_facecolor('#3B4252') + + # Set initial voltage range + voltage_padding = 0.2 + min_voltage = max(0, 0.9 - voltage_padding) + max_voltage = 1.43 + voltage_padding + self.ax.set_ylim(min_voltage, max_voltage) + + # Voltage plot + self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) + self.ax.set_ylabel("Voltage (V)", color='#00BFFF') + self.ax.tick_params(axis='y', labelcolor='#00BFFF') + + # Current plot (right axis) + self.ax2 = self.ax.twinx() + current_padding = 0.05 + test_current = 0.1 * 0.2 # Default values + max_current = test_current * 1.5 + self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) + + self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) + self.ax2.set_ylabel("Current (A)", color='r') + self.ax2.tick_params(axis='y', labelcolor='r') + + self.ax.set_xlabel('Time (s)', color=self.fg_color) + self.ax.set_title('Battery Test (CC)', color=self.fg_color) + self.ax.tick_params(axis='x', colors=self.fg_color) + self.ax.grid(True, color='#4C566A') + + # Position legends + self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) + self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) + + # Embed plot + self.canvas = FigureCanvas(self.fig) + self.canvas.setStyleSheet(f"background-color: {self.bg_color};") + self.main_layout.addWidget(self.canvas, 1) + + def init_device(self): + """Initialize the ADALM1000 device with continuous measurement""" + try: + # Clean up any existing session + if hasattr(self, 'session'): + try: + self.session.end() + del self.session + except: + pass + + time.sleep(1) + + self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) + if not self.session.devices: + raise Exception("No ADALM1000 detected - check connections") + + self.dev = self.session.devices[0] + self.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.dev.channels['B'].mode = pysmu.Mode.HI_Z + self.dev.channels['A'].constant(0) + self.dev.channels['B'].constant(0) + + self.session.start(0) + + self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;") + self.connection_label.setText("Connected") + self.status_bar.showMessage("Device connected | Ready to measure") + self.session_active = True + self.start_button.setEnabled(True) + + # Start measurement thread + self.measurement_thread = MeasurementThread(self.dev, self.interval) + self.measurement_thread.update_signal.connect(self.update_measurements) + self.measurement_thread.error_signal.connect(self.handle_device_error) + + # Start the QThread directly (no need for threading.Thread) + self.measurement_thread.start() + + except Exception as e: + self.handle_device_error(str(e)) + + @pyqtSlot(float, float, float) + def update_measurements(self, voltage, current, current_time): + """Update measurements from the measurement thread""" + self.time_data.append(current_time) + self.voltage_data.append(voltage) + self.current_data.append(current) + + # Update display + self.voltage_label.setText(f"{voltage:.4f}") + self.current_label.setText(f"{current:.4f}") + self.time_label.setText(self.format_time(current_time)) + + # Throttle plot updates to avoid recursive repaint + now = time.time() + if not hasattr(self, '_last_plot_update'): + self._last_plot_update = 0 + + if now - self._last_plot_update > 0.1: # Update plot max 10 times per second + self._last_plot_update = now + QTimer.singleShot(0, self.update_plot) + + def update_status(self): + """Update status information periodically""" + if self.test_running: + # Update capacity calculations if in test mode + if self.measuring and self.time_data: + current_time = time.time() - self.start_time + delta_t = current_time - self.last_update_time + self.last_update_time = current_time + + if self.test_phase == "Discharge": + current_current = abs(self.current_data[-1]) + self.capacity_ah += current_current * delta_t / 3600 + self.capacity_label.setText(f"{self.capacity_ah:.4f}") + elif self.test_phase == "Charge": + current_current = abs(self.current_data[-1]) + self.charge_capacity += current_current * delta_t / 3600 + self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}") + + def start_test(self): + """Start the full battery test cycle""" + if not self.test_running: + try: + # Get parameters from UI + self.capacity = float(self.capacity_input.text()) + self.charge_cutoff = float(self.charge_cutoff_input.text()) + self.discharge_cutoff = float(self.discharge_cutoff_input.text()) + self.rest_time = float(self.rest_time_input.text()) + self.c_rate = float(self.c_rate_input.text()) + + # Validate inputs + if self.capacity <= 0: + raise ValueError("Battery capacity must be positive") + if self.charge_cutoff <= self.discharge_cutoff: + raise ValueError("Charge cutoff must be higher than discharge cutoff") + if self.c_rate <= 0: + raise ValueError("C-rate must be positive") + + test_current = self.c_rate * self.capacity + if test_current > 0.2: + raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") + + # Clear previous data + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + self.phase_data.clear() + self.capacity_ah = 0.0 + self.charge_capacity = 0.0 + self.coulomb_efficiency = 0.0 + self.cycle_count = 0 + + # Reset plot with proper ranges + self.reset_plot() + + # Generate filename and create log file + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}") + self.create_cycle_log_file() + + # Start test + self.test_running = True + self.start_time = time.time() + self.last_update_time = time.time() + self.test_phase = "Initial Discharge" + self.phase_label.setText(self.test_phase) + + self.start_button.setEnabled(False) + self.stop_button.setEnabled(True) + self.status_bar.showMessage(f"Test started | Discharging to {self.discharge_cutoff}V @ {test_current:.3f}A") + + # Start test sequence in a QThread + self.test_sequence_thread = QThread() + self.test_sequence_worker = TestSequenceWorker( + self.dev, + test_current, + self.charge_cutoff, + self.discharge_cutoff, + self.rest_time, + self.continuous_mode_check.isChecked(), + self # Pass reference to main window for callbacks + ) + self.test_sequence_worker.moveToThread(self.test_sequence_thread) + + # Connect signals + self.test_sequence_worker.update_phase.connect(self.update_test_phase) + self.test_sequence_worker.update_status.connect(self.status_bar.showMessage) + self.test_sequence_worker.test_completed.connect(self.finalize_test) + self.test_sequence_worker.error_occurred.connect(self.handle_test_error) + self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit) + self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater) + self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater) + + # Start the thread and the worker's run method + self.test_sequence_thread.start() + QTimer.singleShot(0, self.test_sequence_worker.run) + + # Start capacity calculation timer if not already running + if not self.status_timer.isActive(): + self.status_timer.start(1000) + + except Exception as e: + QMessageBox.critical(self, "Error", str(e)) + # Ensure buttons are in correct state if error occurs + self.start_button.setEnabled(True) + self.stop_button.setEnabled(False) + + def create_cycle_log_file(self): + """Create a new log file for the current cycle""" + try: + # Close previous file if exists + if hasattr(self, 'current_cycle_file') and self.current_cycle_file: + try: + self.current_cycle_file.close() + except Exception as e: + print(f"Error closing previous log file: {e}") + + # Ensure log directory exists + os.makedirs(self.log_dir, exist_ok=True) + + if not os.access(self.log_dir, os.W_OK): + QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}") + return False + + # Generate unique filename + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + self.filename = os.path.join(self.log_dir, f"battery_test_{timestamp}.csv") + + # Open new file + try: + self.current_cycle_file = open(self.filename, 'w', newline='') + self.log_writer = csv.writer(self.current_cycle_file) + self.log_writer.writerow([ + "Time(s)", "Voltage(V)", "Current(A)", "Phase", + "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", + "Coulomb_Eff(%)", "Cycle" + ]) + self.log_buffer = [] + return True + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to create log file: {e}") + return False + except Exception as e: + print(f"Error in create_cycle_log_file: {e}") + return False + + def format_time(self, seconds): + """Convert seconds to hh:mm:ss format""" + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + seconds = int(seconds % 60) + return f"{hours:02d}:{minutes:02d}:{seconds:02d}" + + def stop_test(self): + """Request immediate stop of the test""" + if not self.test_running: + return + + self.request_stop = True + self.test_running = False + self.measuring = False + self.test_phase = "Idle" + self.phase_label.setText(self.test_phase) + + if hasattr(self, 'dev'): + try: + self.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.dev.channels['A'].constant(0) + except Exception as e: + print(f"Error resetting device: {e}") + + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + self.phase_data.clear() + + self.capacity_ah = 0.0 + self.charge_capacity = 0.0 + self.coulomb_efficiency = 0.0 + + self.reset_plot() + + self.status_bar.showMessage("Test stopped - Ready for new test") + self.stop_button.setEnabled(False) + self.start_button.setEnabled(True) + + self.finalize_test() + + def finalize_test(self): + """Final cleanup after test completes or is stopped""" + self.measuring = False + if hasattr(self, 'dev'): + try: + self.dev.channels['A'].constant(0) + except Exception as e: + print(f"Error resetting device: {e}") + + # Only try to close if file exists and is open + if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None: + try: + if self.log_buffer: + self.log_writer.writerows(self.log_buffer) + self.log_buffer.clear() + self.current_cycle_file.close() + except Exception as e: + print(f"Error closing log file: {e}") + finally: + self.current_cycle_file = None + + self.start_button.setEnabled(True) + self.stop_button.setEnabled(False) + self.request_stop = False + + message = ( + f"Test safely stopped after discharge phase | " + f"Cycle {self.cycle_count} completed | " + f"Final capacity: {self.capacity_ah:.3f}Ah" + ) + self.status_bar.showMessage(message) + + QMessageBox.information( + self, + "Test Completed", + f"Test was safely stopped after discharge phase.\n\n" + f"Final discharge capacity: {self.capacity_ah:.3f}Ah\n" + f"Total cycles completed: {self.cycle_count}" + ) + + def reset_plot(self): + """Reset the plot completely for a new test""" + self.line_voltage.set_data([], []) + self.line_current.set_data([], []) + + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + + voltage_padding = 0.2 + min_voltage = max(0, self.discharge_cutoff - voltage_padding) + max_voltage = self.charge_cutoff + voltage_padding + self.ax.set_xlim(0, 10) + self.ax.set_ylim(min_voltage, max_voltage) + + current_padding = 0.05 + test_current = self.c_rate * self.capacity + max_current = test_current * 1.5 + self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) + + self.canvas.draw() + + def write_cycle_summary(self): + """Write cycle summary to the current cycle's log file""" + if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file: + return + + summary_line = ( + f"Cycle {self.cycle_count} Summary - " + f"Discharge={self.capacity_ah:.4f}Ah, " + f"Charge={self.charge_capacity:.4f}Ah, " + f"Efficiency={self.coulomb_efficiency:.1f}%" + ) + + try: + if self.log_buffer: + self.log_writer.writerows(self.log_buffer) + self.log_buffer.clear() + self.current_cycle_file.write(summary_line + "\n") + self.current_cycle_file.flush() + except Exception as e: + print(f"Error writing cycle summary: {e}") + + def update_plot(self): + """More reliable plotting with better error handling""" + try: + # Create local copies safely + with self.plot_mutex: + if not self.time_data or not self.voltage_data or not self.current_data: + return + + if len(self.time_data) != len(self.voltage_data) or len(self.time_data) != len(self.current_data): + # Find the minimum length to avoid mismatch + min_len = min(len(self.time_data), len(self.voltage_data), len(self.current_data)) + x_data = np.array(self.time_data[-min_len:]) + y1_data = np.array(self.voltage_data[-min_len:]) + y2_data = np.array(self.current_data[-min_len:]) + else: + x_data = np.array(self.time_data) + y1_data = np.array(self.voltage_data) + y2_data = np.array(self.current_data) + + # Update plot data + self.line_voltage.set_data(x_data, y1_data) + self.line_current.set_data(x_data, y2_data) + + # Auto-scale when needed + if len(x_data) > 0 and x_data[-1] > self.ax.get_xlim()[1] * 0.8: + self.auto_scale_axes() + + # Force redraw + self.canvas.draw_idle() + + except Exception as e: + print(f"Plot error: {e}") + # Reset plot on error + self.line_voltage.set_data([], []) + self.line_current.set_data([], []) + self.canvas.draw_idle() + + def auto_scale_axes(self): + """Auto-scale plot axes with appropriate padding and strict boundaries""" + if not self.time_data: + return + + min_time = 0 + max_time = self.time_data[-1] + current_xlim = self.ax.get_xlim() + + if max_time > current_xlim[1] * 0.95: + new_max = max_time * 1.05 + self.ax.set_xlim(min_time, new_max) + self.ax2.set_xlim(min_time, new_max) + + voltage_padding = 0.2 + if self.voltage_data: + min_voltage = max(0, min(self.voltage_data) - voltage_padding) + max_voltage = min(5.0, max(self.voltage_data) + voltage_padding) + current_ylim = self.ax.get_ylim() + if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1): + self.ax.set_ylim(min_voltage, max_voltage) + + current_padding = 0.05 + if self.current_data: + min_current = max(-0.25, min(self.current_data) - current_padding) + max_current = min(0.25, max(self.current_data) + current_padding) + current_ylim2 = self.ax2.get_ylim() + if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02): + self.ax2.set_ylim(min_current, max_current) + + @pyqtSlot(str) + def handle_device_error(self, error): + """Handle device connection errors""" + error_msg = str(error) + print(f"Device error: {error_msg}") + + if hasattr(self, 'session'): + try: + if self.session_active: + self.session.end() + del self.session + except Exception as e: + print(f"Error cleaning up session: {e}") + + self.status_light.setStyleSheet(f"background-color: red; border-radius: 10px;") + self.connection_label.setText("Disconnected") + self.status_bar.showMessage(f"Device error: {error_msg}") + + self.session_active = False + self.test_running = False + self.continuous_mode = False + self.measuring = False + + self.start_button.setEnabled(False) + self.stop_button.setEnabled(False) + + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + @pyqtSlot(str) + def update_test_phase(self, phase_text): + """Update the test phase display""" + self.test_phase = phase_text + self.phase_label.setText(phase_text) + + # Update log if available + if hasattr(self, 'log_buffer'): + current_time = time.time() - self.start_time + self.log_buffer.append([ + f"{current_time:.3f}", + "", + "", + phase_text, + f"{self.capacity_ah:.4f}", + f"{self.charge_capacity:.4f}", + f"{self.coulomb_efficiency:.1f}" if hasattr(self, 'coulomb_efficiency') else "0.0", + f"{self.cycle_count}" + ]) + + @pyqtSlot(str) + def handle_test_error(self, error_msg): + """Handle errors from the test sequence with complete cleanup""" + try: + # 1. Notify user + QMessageBox.critical(self, "Test Error", + f"An error occurred:\n{error_msg}\n\nAttempting to recover...") + + # 2. Stop all operations + self.stop_test() + + # 3. Reset UI elements + if hasattr(self, 'line_voltage'): + try: + self.line_voltage.set_data([], []) + self.line_current.set_data([], []) + self.ax.set_xlim(0, 1) + self.ax2.set_xlim(0, 1) + self.canvas.draw() + except Exception as plot_error: + print(f"Plot reset error: {plot_error}") + + # 4. Update status + self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...") + self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;") + + # 5. Attempt recovery + QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect + + except Exception as e: + print(f"Error in error handler: {e}") + # Fallback - restart application? + QMessageBox.critical(self, "Fatal Error", + "The application needs to restart due to an unrecoverable error") + QTimer.singleShot(1000, self.close) + + def attempt_reconnect(self): + """Attempt to reconnect automatically""" + QMessageBox.critical( + self, + "Device Connection Error", + "Could not connect to ADALM1000\n\n" + "1. Check USB cable connection\n" + "2. The device will attempt to reconnect automatically" + ) + + QTimer.singleShot(1000, self.reconnect_device) + + def reconnect_device(self): + """Reconnect the device with proper cleanup""" + self.status_bar.showMessage("Attempting to reconnect...") + + if hasattr(self, 'session'): + try: + if self.session_active: + self.session.end() + del self.session + except: + pass + + self.test_running = False + self.continuous_mode = False + self.measuring = False + + if hasattr(self, 'measurement_thread'): + self.measurement_thread.stop() + + time.sleep(1.5) + + try: + self.init_device() + if self.session_active: + self.status_bar.showMessage("Reconnected successfully") + return + except Exception as e: + print(f"Reconnect failed: {e}") + + self.status_bar.showMessage("Reconnect failed - will retry...") + QTimer.singleShot(2000, self.reconnect_device) + + def closeEvent(self, event): + """Clean up on window close""" + self.test_running = False + self.measuring = False + self.session_active = False + + # Stop measurement thread + if hasattr(self, 'measurement_thread'): + self.measurement_thread.stop() + + # Stop test sequence thread + if hasattr(self, 'test_sequence_thread'): + if hasattr(self, 'test_sequence_worker'): + self.test_sequence_worker.stop() + self.test_sequence_thread.quit() + self.test_sequence_thread.wait(500) + + # Clean up device session + if hasattr(self, 'session') and self.session: + try: + self.session.end() + except Exception as e: + print(f"Error ending session: {e}") + + event.accept() + +if __name__ == "__main__": + app = QApplication([]) try: - # Configure both channels properly - self.device.channels['B'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].mode = pysmu.Mode.SIMV - self.device.channels['A'].sink = True # Ensure sinking current - self.device.channels['A'].constant(-self.test_current) - - # Small delay to allow current to stabilize - time.sleep(0.1) - - while self._running: - voltage, current = self.get_latest_measurement() - if voltage is None: - continue - - # Update parent's data for logging/display - with self.parent.plot_mutex: - if len(self.parent.voltage_data) > 0: - self.parent.voltage_data[-1] = voltage - self.parent.current_data[-1] = current - - if voltage <= self.discharge_cutoff: - break - - time.sleep(0.1) - - finally: - self.device.channels['A'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].constant(0) \ No newline at end of file + window = BatteryTester() + window.show() + app.exec_() + except Exception as e: + QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}") \ No newline at end of file