From 9746ca12b3257f8cfb9aec373c6ba9747ea72ceb Mon Sep 17 00:00:00 2001 From: Jan Date: Thu, 3 Jul 2025 18:07:32 +0200 Subject: [PATCH] MainCode/adalm1000_logger.py aktualisiert Sourcing und sinking current doesnt work. --- MainCode/adalm1000_logger.py | 1018 +--------------------------------- 1 file changed, 25 insertions(+), 993 deletions(-) diff --git a/MainCode/adalm1000_logger.py b/MainCode/adalm1000_logger.py index af1b068..cee0173 100644 --- a/MainCode/adalm1000_logger.py +++ b/MainCode/adalm1000_logger.py @@ -1,118 +1,18 @@ -# -*- coding: utf-8 -*- -import os -import time -import csv -import threading -from datetime import datetime -import numpy as np -import matplotlib -matplotlib.use('Qt5Agg') -from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas -from matplotlib.figure import Figure -from collections import deque -from queue import Queue, Full, Empty - -from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, - QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog) -from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread -import pysmu - -class DeviceDisconnectedError(Exception): - pass - -class MeasurementThread(QThread): - update_signal = pyqtSignal(float, float, float) - error_signal = pyqtSignal(str) - - def __init__(self, device, interval=0.1): - super().__init__() - self.device = device - self.interval = interval - self._running = False - self.filter_window_size = 10 - self.voltage_window = [] - self.current_window = [] - self.start_time = time.time() - self.measurement_queue = Queue(maxsize=1) - - def run(self): - self._running = True - while self._running: - try: - samples = self.device.read(self.filter_window_size, 500, True) - if not samples: - raise DeviceDisconnectedError("No samples received") - - raw_voltage = np.mean([s[1][0] for s in samples]) - raw_current = np.mean([s[0][1] for s in samples]) - current_time = time.time() - self.start_time - - self.voltage_window.append(raw_voltage) - self.current_window.append(raw_current) - - if len(self.voltage_window) > self.filter_window_size: - self.voltage_window.pop(0) - self.current_window.pop(0) - - voltage = np.mean(self.voltage_window) - current = np.mean(self.current_window) - - self.update_signal.emit(voltage, current, current_time) - - # Store the latest measurement in the queue - try: - self.measurement_queue.put_nowait((voltage, current)) - except Full: - pass - - time.sleep(max(0.05, self.interval)) - - except Exception as e: - self.error_signal.emit(f"Read error: {str(e)}") - time.sleep(1) - continue - - def stop(self): - self._running = False - self.wait(500) - -class TestSequenceWorker(QObject): - finished = pyqtSignal() - update_phase = pyqtSignal(str) - update_status = pyqtSignal(str) - test_completed = pyqtSignal() - error_occurred = pyqtSignal(str) +def charge_phase(self): + """Handle the battery charging phase""" + self.update_phase.emit("Charge") + self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.3f}A") - def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent): - super().__init__() - self.device = device - self.test_current = test_current - self.charge_cutoff = charge_cutoff - self.discharge_cutoff = discharge_cutoff - self.rest_time = rest_time * 3600 # Convert hours to seconds - self.continuous_mode = continuous_mode - self.parent = parent - self._running = True - self.voltage_timeout = 0.5 # seconds - - def get_latest_measurement(self): - """Thread-safe measurement reading with timeout""" - try: - return self.parent.measurement_thread.measurement_queue.get( - timeout=self.voltage_timeout - ) - except Empty: - return (None, None) # Return tuple for unpacking - - def charge_phase(self): - """Handle the battery charging phase""" - self.update_phase.emit("Charge") - self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.3f}A") - + try: + # Configure both channels properly self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV + self.device.channels['A'].sink = False # Ensure sourcing current self.device.channels['A'].constant(self.test_current) + # Small delay to allow current to stabilize + time.sleep(0.1) + while self._running: voltage, current = self.get_latest_measurement() if voltage is None: @@ -129,17 +29,25 @@ class TestSequenceWorker(QObject): time.sleep(0.1) + finally: self.device.channels['A'].mode = pysmu.Mode.HI_Z self.device.channels['A'].constant(0) - def discharge_phase(self): - """Handle the battery discharging phase""" - self.update_phase.emit("Discharge") - self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.3f}A") - +def discharge_phase(self): + """Handle the battery discharging phase""" + self.update_phase.emit("Discharge") + self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.3f}A") + + try: + # Configure both channels properly + self.device.channels['B'].mode = pysmu.Mode.HI_Z self.device.channels['A'].mode = pysmu.Mode.SIMV + self.device.channels['A'].sink = True # Ensure sinking current self.device.channels['A'].constant(-self.test_current) + # Small delay to allow current to stabilize + time.sleep(0.1) + while self._running: voltage, current = self.get_latest_measurement() if voltage is None: @@ -156,882 +64,6 @@ class TestSequenceWorker(QObject): time.sleep(0.1) + finally: self.device.channels['A'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].constant(0) - - def rest_phase(self, phase_name): - """Handle rest period between phases""" - self.update_phase.emit(f"Resting ({phase_name})") - rest_end = time.time() + self.rest_time - - while time.time() < rest_end and self._running: - time_left = max(0, rest_end - time.time()) - self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min") - time.sleep(1) - - def stop(self): - """Request the thread to stop""" - self._running = False - self.device.channels['A'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].constant(0) - -class BatteryTester(QMainWindow): - def __init__(self): - self.plot_mutex = threading.Lock() - super().__init__() - - # Color scheme - self.bg_color = "#2E3440" - self.fg_color = "#D8DEE9" - self.accent_color = "#5E81AC" - self.warning_color = "#BF616A" - self.success_color = "#A3BE8C" - - # Device and measurement state - self.session_active = False - self.measuring = False - self.test_running = False - self.continuous_mode = False - self.request_stop = False - self.interval = 0.1 - self.log_dir = os.path.expanduser("~/adalm1000/logs") - os.makedirs(self.log_dir, exist_ok=True) - - # Data buffers - self.time_data = deque() - self.voltage_data = deque() - self.current_data = deque() - self.phase_data = deque() - - # Initialize UI and device - self.setup_ui() - self.init_device() - - # Set window properties - self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)") - self.resize(1000, 800) - self.setMinimumSize(800, 700) - - # Status update timer - self.status_timer = QTimer() - self.status_timer.timeout.connect(self.update_status) - self.status_timer.start(1000) # Update every second - - def setup_ui(self): - """Configure the user interface""" - # Main widget and layout - self.central_widget = QWidget() - self.setCentralWidget(self.central_widget) - self.main_layout = QVBoxLayout(self.central_widget) - self.main_layout.setContentsMargins(10, 10, 10, 10) - - # Header area - header_frame = QFrame() - header_frame.setFrameShape(QFrame.NoFrame) - header_layout = QHBoxLayout(header_frame) - header_layout.setContentsMargins(0, 0, 0, 0) - - self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)") - self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};") - header_layout.addWidget(self.title_label, 1) - - # Status indicator - self.status_light = QLabel() - self.status_light.setFixedSize(20, 20) - self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") - header_layout.addWidget(self.status_light) - - self.connection_label = QLabel("Disconnected") - header_layout.addWidget(self.connection_label) - - # Reconnect button - self.reconnect_btn = QPushButton("Reconnect") - self.reconnect_btn.clicked.connect(self.reconnect_device) - header_layout.addWidget(self.reconnect_btn) - - self.main_layout.addWidget(header_frame) - - # Measurement display - display_frame = QFrame() - display_frame.setFrameShape(QFrame.StyledPanel) - display_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") - display_layout = QGridLayout(display_frame) - - # Measurement values - measurement_labels = [ - ("Voltage (V)", "V"), - ("Current (A)", "A"), - ("Test Phase", ""), - ("Elapsed Time", "s"), - ("Discharge Capacity", "Ah"), - ("Charge Capacity", "Ah"), - ("Coulomb Eff.", "%"), - ("Cycle Count", ""), - ] - - self.value_labels = {} - for i, (label, unit) in enumerate(measurement_labels): - row = i // 2 - col = (i % 2) * 2 - - lbl = QLabel(f"{label}:") - lbl.setStyleSheet(f"color: {self.fg_color};") - display_layout.addWidget(lbl, row, col) - - value_lbl = QLabel("0.000") - value_lbl.setStyleSheet(f"color: {self.fg_color}; font-weight: bold;") - display_layout.addWidget(value_lbl, row, col + 1) - - if unit: - unit_lbl = QLabel(unit) - unit_lbl.setStyleSheet(f"color: {self.fg_color};") - display_layout.addWidget(unit_lbl, row, col + 2) - - # Store references to important labels - if i == 0: - self.voltage_label = value_lbl - elif i == 1: - self.current_label = value_lbl - elif i == 2: - self.phase_label = value_lbl - elif i == 3: - self.time_label = value_lbl - elif i == 4: - self.capacity_label = value_lbl - elif i == 5: - self.charge_capacity_label = value_lbl - elif i == 6: - self.efficiency_label = value_lbl - elif i == 7: - self.cycle_label = value_lbl - - self.main_layout.addWidget(display_frame) - - # Control area - controls_frame = QFrame() - controls_frame.setFrameShape(QFrame.NoFrame) - controls_layout = QHBoxLayout(controls_frame) - controls_layout.setContentsMargins(0, 0, 0, 0) - - # Parameters frame - params_frame = QFrame() - params_frame.setFrameShape(QFrame.StyledPanel) - params_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") - params_layout = QGridLayout(params_frame) - - # Battery capacity - self.capacity = 0.2 - self.capacity_label_input = QLabel("Battery Capacity (Ah):") - self.capacity_label_input.setStyleSheet(f"color: {self.fg_color};") - params_layout.addWidget(self.capacity_label_input, 0, 0) - self.capacity_input = QLineEdit("0.2") - self.capacity_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") - self.capacity_input.setFixedWidth(60) - params_layout.addWidget(self.capacity_input, 0, 1) - - # Charge cutoff - self.charge_cutoff = 1.43 - self.charge_cutoff_label = QLabel("Charge Cutoff (V):") - self.charge_cutoff_label.setStyleSheet(f"color: {self.fg_color};") - params_layout.addWidget(self.charge_cutoff_label, 1, 0) - self.charge_cutoff_input = QLineEdit("1.43") - self.charge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") - self.charge_cutoff_input.setFixedWidth(60) - params_layout.addWidget(self.charge_cutoff_input, 1, 1) - - # Discharge cutoff - self.discharge_cutoff = 0.9 - self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):") - self.discharge_cutoff_label.setStyleSheet(f"color: {self.fg_color};") - params_layout.addWidget(self.discharge_cutoff_label, 2, 0) - self.discharge_cutoff_input = QLineEdit("0.9") - self.discharge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") - self.discharge_cutoff_input.setFixedWidth(60) - params_layout.addWidget(self.discharge_cutoff_input, 2, 1) - - # Rest time - self.rest_time = 0.25 - self.rest_time_label = QLabel("Rest Time (hours):") - self.rest_time_label.setStyleSheet(f"color: {self.fg_color};") - params_layout.addWidget(self.rest_time_label, 3, 0) - self.rest_time_input = QLineEdit("0.25") - self.rest_time_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") - self.rest_time_input.setFixedWidth(60) - params_layout.addWidget(self.rest_time_input, 3, 1) - - # C-rate for test - self.c_rate = 0.1 - self.c_rate_label = QLabel("Test C-rate:") - self.c_rate_label.setStyleSheet(f"color: {self.fg_color};") - params_layout.addWidget(self.c_rate_label, 0, 2) - self.c_rate_input = QLineEdit("0.1") - self.c_rate_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") - self.c_rate_input.setFixedWidth(40) - params_layout.addWidget(self.c_rate_input, 0, 3) - - c_rate_note = QLabel("(e.g., 0.2 for C/5)") - c_rate_note.setStyleSheet(f"color: {self.fg_color};") - params_layout.addWidget(c_rate_note, 0, 4) - - controls_layout.addWidget(params_frame, 1) - - # Button frame - button_frame = QFrame() - button_frame.setFrameShape(QFrame.NoFrame) - button_layout = QVBoxLayout(button_frame) - button_layout.setContentsMargins(0, 0, 0, 0) - - self.start_button = QPushButton("START TEST") - self.start_button.setStyleSheet(f""" - QPushButton {{ - background-color: {self.accent_color}; - color: {self.fg_color}; - font-weight: bold; - padding: 6px; - border-radius: 4px; - }} - QPushButton:disabled {{ - background-color: #4C566A; - color: #D8DEE9; - }} - """) - self.start_button.clicked.connect(self.start_test) - button_layout.addWidget(self.start_button) - - self.stop_button = QPushButton("STOP TEST") - self.stop_button.setStyleSheet(f""" - QPushButton {{ - background-color: {self.warning_color}; - color: {self.fg_color}; - font-weight: bold; - padding: 6px; - border-radius: 4px; - }} - QPushButton:disabled {{ - background-color: #4C566A; - color: #D8DEE9; - }} - """) - self.stop_button.clicked.connect(self.stop_test) - self.stop_button.setEnabled(False) - button_layout.addWidget(self.stop_button) - - # Continuous mode checkbox - self.continuous_mode_check = QCheckBox("Continuous Mode") - self.continuous_mode_check.setChecked(True) - self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};") - button_layout.addWidget(self.continuous_mode_check) - - controls_layout.addWidget(button_frame) - self.main_layout.addWidget(controls_frame) - - # Plot area - self.setup_plot() - - # Status bar - self.status_bar = self.statusBar() - self.status_bar.setStyleSheet(f"color: {self.fg_color};") - self.status_bar.showMessage("Ready") - - # Apply dark theme - self.setStyleSheet(f""" - QMainWindow {{ - background-color: {self.bg_color}; - }} - QLabel {{ - color: {self.fg_color}; - }} - QLineEdit {{ - background-color: #3B4252; - color: {self.fg_color}; - border: 1px solid #4C566A; - border-radius: 3px; - padding: 2px; - }} - """) - - def setup_plot(self): - """Configure the matplotlib plot""" - self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color) - self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) - self.ax = self.fig.add_subplot(111) - self.ax.set_facecolor('#3B4252') - - # Set initial voltage range - voltage_padding = 0.2 - min_voltage = max(0, 0.9 - voltage_padding) - max_voltage = 1.43 + voltage_padding - self.ax.set_ylim(min_voltage, max_voltage) - - # Voltage plot - self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) - self.ax.set_ylabel("Voltage (V)", color='#00BFFF') - self.ax.tick_params(axis='y', labelcolor='#00BFFF') - - # Current plot (right axis) - self.ax2 = self.ax.twinx() - current_padding = 0.05 - test_current = 0.1 * 0.2 # Default values - max_current = test_current * 1.5 - self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) - - self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) - self.ax2.set_ylabel("Current (A)", color='r') - self.ax2.tick_params(axis='y', labelcolor='r') - - self.ax.set_xlabel('Time (s)', color=self.fg_color) - self.ax.set_title('Battery Test (CC)', color=self.fg_color) - self.ax.tick_params(axis='x', colors=self.fg_color) - self.ax.grid(True, color='#4C566A') - - # Position legends - self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) - self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) - - # Embed plot - self.canvas = FigureCanvas(self.fig) - self.canvas.setStyleSheet(f"background-color: {self.bg_color};") - self.main_layout.addWidget(self.canvas, 1) - - def init_device(self): - """Initialize the ADALM1000 device with continuous measurement""" - try: - # Clean up any existing session - if hasattr(self, 'session'): - try: - self.session.end() - del self.session - except: - pass - - time.sleep(1) - - self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) - if not self.session.devices: - raise Exception("No ADALM1000 detected - check connections") - - self.dev = self.session.devices[0] - self.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.dev.channels['B'].mode = pysmu.Mode.HI_Z - self.dev.channels['A'].constant(0) - self.dev.channels['B'].constant(0) - - self.session.start(0) - - self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;") - self.connection_label.setText("Connected") - self.status_bar.showMessage("Device connected | Ready to measure") - self.session_active = True - self.start_button.setEnabled(True) - - # Start measurement thread - self.measurement_thread = MeasurementThread(self.dev, self.interval) - self.measurement_thread.update_signal.connect(self.update_measurements) - self.measurement_thread.error_signal.connect(self.handle_device_error) - - # Start the QThread directly (no need for threading.Thread) - self.measurement_thread.start() - - except Exception as e: - self.handle_device_error(str(e)) - - @pyqtSlot(float, float, float) - def update_measurements(self, voltage, current, current_time): - """Update measurements from the measurement thread""" - self.time_data.append(current_time) - self.voltage_data.append(voltage) - self.current_data.append(current) - - # Update display - self.voltage_label.setText(f"{voltage:.4f}") - self.current_label.setText(f"{current:.4f}") - self.time_label.setText(self.format_time(current_time)) - - # Throttle plot updates to avoid recursive repaint - now = time.time() - if not hasattr(self, '_last_plot_update'): - self._last_plot_update = 0 - - if now - self._last_plot_update > 0.1: # Update plot max 10 times per second - self._last_plot_update = now - QTimer.singleShot(0, self.update_plot) - - def update_status(self): - """Update status information periodically""" - if self.test_running: - # Update capacity calculations if in test mode - if self.measuring and self.time_data: - current_time = time.time() - self.start_time - delta_t = current_time - self.last_update_time - self.last_update_time = current_time - - if self.test_phase == "Discharge": - current_current = abs(self.current_data[-1]) - self.capacity_ah += current_current * delta_t / 3600 - self.capacity_label.setText(f"{self.capacity_ah:.4f}") - elif self.test_phase == "Charge": - current_current = abs(self.current_data[-1]) - self.charge_capacity += current_current * delta_t / 3600 - self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}") - - def start_test(self): - """Start the full battery test cycle""" - if not self.test_running: - try: - # Get parameters from UI - self.capacity = float(self.capacity_input.text()) - self.charge_cutoff = float(self.charge_cutoff_input.text()) - self.discharge_cutoff = float(self.discharge_cutoff_input.text()) - self.rest_time = float(self.rest_time_input.text()) - self.c_rate = float(self.c_rate_input.text()) - - # Validate inputs - if self.capacity <= 0: - raise ValueError("Battery capacity must be positive") - if self.charge_cutoff <= self.discharge_cutoff: - raise ValueError("Charge cutoff must be higher than discharge cutoff") - if self.c_rate <= 0: - raise ValueError("C-rate must be positive") - - test_current = self.c_rate * self.capacity - if test_current > 0.2: - raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") - - # Clear previous data - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - self.phase_data.clear() - self.capacity_ah = 0.0 - self.charge_capacity = 0.0 - self.coulomb_efficiency = 0.0 - self.cycle_count = 0 - - # Reset plot - self.reset_plot() - - # Generate filename - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}") - self.current_cycle_file = None - - # Start test - self.test_running = True - self.start_time = time.time() - self.last_update_time = time.time() - self.test_phase = "Initial Discharge" - self.phase_label.setText(self.test_phase) - - self.start_button.setEnabled(False) - self.stop_button.setEnabled(True) - self.status_bar.showMessage(f"Test started | Discharging to {self.discharge_cutoff}V @ {test_current:.3f}A") - - # Start test sequence in a QThread - self.test_sequence_thread = QThread() - self.test_sequence_worker = TestSequenceWorker( - self.dev, - test_current, - self.charge_cutoff, - self.discharge_cutoff, - self.rest_time, - self.continuous_mode_check.isChecked(), - self # Pass reference to main window for callbacks - ) - self.test_sequence_worker.moveToThread(self.test_sequence_thread) - - # Connect signals - self.test_sequence_worker.update_phase.connect(self.update_test_phase) - self.test_sequence_worker.update_status.connect(self.status_bar.showMessage) - self.test_sequence_worker.test_completed.connect(self.finalize_test) - self.test_sequence_worker.error_occurred.connect(self.handle_test_error) - self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit) - self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater) - self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater) - - self.test_sequence_thread.start() - - except Exception as e: - QMessageBox.critical(self, "Error", str(e)) - - def create_cycle_log_file(self): - """Create a new log file for the current cycle""" - if hasattr(self, 'current_cycle_file') and self.current_cycle_file: - try: - self.current_cycle_file.close() - except Exception as e: - print(f"Error closing previous log file: {e}") - - if not os.access(self.log_dir, os.W_OK): - QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}") - return False - - suffix = 1 - while True: - self.filename = f"{self.base_filename}_{suffix}.csv" - if not os.path.exists(self.filename): - break - suffix += 1 - - try: - self.current_cycle_file = open(self.filename, 'w', newline='') - self.log_writer = csv.writer(self.current_cycle_file) - self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase", - "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", - "Coulomb_Eff(%)", "Cycle"]) - self.log_buffer = [] - return True - except Exception as e: - QMessageBox.critical(self, "Error", f"Failed to create log file: {e}") - return False - - def format_time(self, seconds): - """Convert seconds to hh:mm:ss format""" - hours = int(seconds // 3600) - minutes = int((seconds % 3600) // 60) - seconds = int(seconds % 60) - return f"{hours:02d}:{minutes:02d}:{seconds:02d}" - - def stop_test(self): - """Request immediate stop of the test""" - if not self.test_running: - return - - self.request_stop = True - self.test_running = False - self.measuring = False - self.test_phase = "Idle" - self.phase_label.setText(self.test_phase) - - if hasattr(self, 'dev'): - try: - self.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.dev.channels['A'].constant(0) - except Exception as e: - print(f"Error resetting device: {e}") - - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - self.phase_data.clear() - - self.capacity_ah = 0.0 - self.charge_capacity = 0.0 - self.coulomb_efficiency = 0.0 - - self.reset_plot() - - self.status_bar.showMessage("Test stopped - Ready for new test") - self.stop_button.setEnabled(False) - self.start_button.setEnabled(True) - - self.finalize_test() - - def finalize_test(self): - """Final cleanup after test completes or is stopped""" - self.measuring = False - if hasattr(self, 'dev'): - try: - self.dev.channels['A'].constant(0) - except Exception as e: - print(f"Error resetting device: {e}") - - if hasattr(self, 'log_buffer') and self.log_buffer and hasattr(self, 'log_writer'): - try: - self.log_writer.writerows(self.log_buffer) - self.log_buffer.clear() - except Exception as e: - print(f"Error flushing log buffer: {e}") - - if hasattr(self, 'current_cycle_file'): - try: - self.current_cycle_file.close() - except Exception as e: - print(f"Error closing log file: {e}") - - self.start_button.setEnabled(True) - self.stop_button.setEnabled(False) - self.request_stop = False - - message = ( - f"Test safely stopped after discharge phase | " - f"Cycle {self.cycle_count} completed | " - f"Final capacity: {self.capacity_ah:.3f}Ah" - ) - self.status_bar.showMessage(message) - - QMessageBox.information( - self, - "Test Completed", - f"Test was safely stopped after discharge phase.\n\n" - f"Final discharge capacity: {self.capacity_ah:.3f}Ah\n" - f"Total cycles completed: {self.cycle_count}" - ) - - def reset_plot(self): - """Reset the plot completely for a new test""" - self.line_voltage.set_data([], []) - self.line_current.set_data([], []) - - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - - voltage_padding = 0.2 - min_voltage = max(0, self.discharge_cutoff - voltage_padding) - max_voltage = self.charge_cutoff + voltage_padding - self.ax.set_xlim(0, 10) - self.ax.set_ylim(min_voltage, max_voltage) - - current_padding = 0.05 - test_current = self.c_rate * self.capacity - max_current = test_current * 1.5 - self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) - - self.canvas.draw() - - def write_cycle_summary(self): - """Write cycle summary to the current cycle's log file""" - if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file: - return - - summary_line = ( - f"Cycle {self.cycle_count} Summary - " - f"Discharge={self.capacity_ah:.4f}Ah, " - f"Charge={self.charge_capacity:.4f}Ah, " - f"Efficiency={self.coulomb_efficiency:.1f}%" - ) - - try: - if self.log_buffer: - self.log_writer.writerows(self.log_buffer) - self.log_buffer.clear() - self.current_cycle_file.write(summary_line + "\n") - self.current_cycle_file.flush() - except Exception as e: - print(f"Error writing cycle summary: {e}") - - def update_plot(self): - """More reliable plotting with better error handling""" - if not self.time_data or len(self.time_data) != len(self.voltage_data): - print("Plot: No data or mismatched lengths") # Debug - return - - try: - # Create local copies quickly - with self.plot_mutex: - x_data = np.array(self.time_data) - y1_data = np.array(self.voltage_data) - y2_data = np.array(self.current_data) - - # Update plot data - self.line_voltage.set_data(x_data, y1_data) - self.line_current.set_data(x_data, y2_data) - - # Only auto-scale when needed - if x_data[-1] > self.ax.get_xlim()[1] * 0.8: - self.auto_scale_axes() - - # Force redraw - self.canvas.draw_idle() - - except Exception as e: - print(f"Plot error: {e}") - # Reset plot on error - self.line_voltage.set_data([], []) - self.line_current.set_data([], []) - self.canvas.draw_idle() - - def auto_scale_axes(self): - """Auto-scale plot axes with appropriate padding and strict boundaries""" - if not self.time_data: - return - - min_time = 0 - max_time = self.time_data[-1] - current_xlim = self.ax.get_xlim() - - if max_time > current_xlim[1] * 0.95: - new_max = max_time * 1.05 - self.ax.set_xlim(min_time, new_max) - self.ax2.set_xlim(min_time, new_max) - - voltage_padding = 0.2 - if self.voltage_data: - min_voltage = max(0, min(self.voltage_data) - voltage_padding) - max_voltage = min(5.0, max(self.voltage_data) + voltage_padding) - current_ylim = self.ax.get_ylim() - if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1): - self.ax.set_ylim(min_voltage, max_voltage) - - current_padding = 0.05 - if self.current_data: - min_current = max(-0.25, min(self.current_data) - current_padding) - max_current = min(0.25, max(self.current_data) + current_padding) - current_ylim2 = self.ax2.get_ylim() - if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02): - self.ax2.set_ylim(min_current, max_current) - - @pyqtSlot(str) - def handle_device_error(self, error): - """Handle device connection errors""" - error_msg = str(error) - print(f"Device error: {error_msg}") - - if hasattr(self, 'session'): - try: - if self.session_active: - self.session.end() - del self.session - except Exception as e: - print(f"Error cleaning up session: {e}") - - self.status_light.setStyleSheet(f"background-color: red; border-radius: 10px;") - self.connection_label.setText("Disconnected") - self.status_bar.showMessage(f"Device error: {error_msg}") - - self.session_active = False - self.test_running = False - self.continuous_mode = False - self.measuring = False - - self.start_button.setEnabled(False) - self.stop_button.setEnabled(False) - - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - - @pyqtSlot(str) - def update_test_phase(self, phase_text): - """Update the test phase display""" - self.test_phase = phase_text - self.phase_label.setText(phase_text) - - # Update log if available - if hasattr(self, 'log_buffer'): - current_time = time.time() - self.start_time - self.log_buffer.append([ - f"{current_time:.3f}", - "", - "", - phase_text, - f"{self.capacity_ah:.4f}", - f"{self.charge_capacity:.4f}", - f"{self.coulomb_efficiency:.1f}" if hasattr(self, 'coulomb_efficiency') else "0.0", - f"{self.cycle_count}" - ]) - - @pyqtSlot(str) - def handle_test_error(self, error_msg): - """Handle errors from the test sequence with complete cleanup""" - try: - # 1. Notify user - QMessageBox.critical(self, "Test Error", - f"An error occurred:\n{error_msg}\n\nAttempting to recover...") - - # 2. Stop all operations - self.stop_test() - - # 3. Reset UI elements - if hasattr(self, 'line_voltage'): - try: - self.line_voltage.set_data([], []) - self.line_current.set_data([], []) - self.ax.set_xlim(0, 1) - self.ax2.set_xlim(0, 1) - self.canvas.draw() - except Exception as plot_error: - print(f"Plot reset error: {plot_error}") - - # 4. Update status - self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...") - self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;") - - # 5. Attempt recovery - QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect - - except Exception as e: - print(f"Error in error handler: {e}") - # Fallback - restart application? - QMessageBox.critical(self, "Fatal Error", - "The application needs to restart due to an unrecoverable error") - QTimer.singleShot(1000, self.close) - - def attempt_reconnect(self): - """Attempt to reconnect automatically""" - QMessageBox.critical( - self, - "Device Connection Error", - "Could not connect to ADALM1000\n\n" - "1. Check USB cable connection\n" - "2. The device will attempt to reconnect automatically" - ) - - QTimer.singleShot(1000, self.reconnect_device) - - def reconnect_device(self): - """Reconnect the device with proper cleanup""" - self.status_bar.showMessage("Attempting to reconnect...") - - if hasattr(self, 'session'): - try: - if self.session_active: - self.session.end() - del self.session - except: - pass - - self.test_running = False - self.continuous_mode = False - self.measuring = False - - if hasattr(self, 'measurement_thread'): - self.measurement_thread.stop() - - time.sleep(1.5) - - try: - self.init_device() - if self.session_active: - self.status_bar.showMessage("Reconnected successfully") - return - except Exception as e: - print(f"Reconnect failed: {e}") - - self.status_bar.showMessage("Reconnect failed - will retry...") - QTimer.singleShot(2000, self.reconnect_device) - - def closeEvent(self, event): - """Clean up on window close""" - self.test_running = False - self.measuring = False - self.session_active = False - - # Stop measurement thread - if hasattr(self, 'measurement_thread'): - self.measurement_thread.stop() - - # Stop test sequence thread - if hasattr(self, 'test_sequence_thread'): - if hasattr(self, 'test_sequence_worker'): - self.test_sequence_worker.stop() - self.test_sequence_thread.quit() - self.test_sequence_thread.wait(500) - - # Clean up device session - if hasattr(self, 'session') and self.session: - try: - self.session.end() - except Exception as e: - print(f"Error ending session: {e}") - - event.accept() - -if __name__ == "__main__": - app = QApplication([]) - try: - window = BatteryTester() - window.show() - app.exec_() - except Exception as e: - QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}") \ No newline at end of file + self.device.channels['A'].constant(0) \ No newline at end of file