From 5881ab7bb73fcb12d32eb6713959ab7e76715865 Mon Sep 17 00:00:00 2001 From: Jan Date: Mon, 30 Jun 2025 02:44:33 +0200 Subject: [PATCH] revert 72c25f7ed373f2e0f4d4c282fc162dbd80aba3ca MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit revert MainCode/adalm1000_logger.py gelöscht File "/home/jan/adalm1000/bin/adalm1000_logger.py", line 691 self.measurement_thread = MeasurementThread( ^ SyntaxError: '(' was never closed --- MainCode/adalm1000_logger.py | 1016 ++++++++++++++++++++++++++++++++++ 1 file changed, 1016 insertions(+) create mode 100644 MainCode/adalm1000_logger.py diff --git a/MainCode/adalm1000_logger.py b/MainCode/adalm1000_logger.py new file mode 100644 index 0000000..9973b88 --- /dev/null +++ b/MainCode/adalm1000_logger.py @@ -0,0 +1,1016 @@ +# -*- coding: utf-8 -*- +import os +import time +import csv +from datetime import datetime +import numpy as np + +# Suppress warnings +os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false' +os.environ['LIBUSB_DEBUG'] = '0' + +from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, + QGridLayout, QLabel, QPushButton, QLineEdit, QFrame, + QCheckBox, QMessageBox, QFileDialog, QProgressBar) +from PyQt5.QtCore import (Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread, + QMutex, QMutexLocker) +from PyQt5.QtGui import QColor, QPalette +from collections import deque +import pysmu +import matplotlib +matplotlib.use('Qt5Agg') +from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas +from matplotlib.figure import Figure + + +class DeviceDisconnectedError(Exception): + """Custom exception for device disconnection events.""" + pass + + +class MeasurementThread(QThread): + """Thread for continuous measurement of voltage and current.""" + + update_signal = pyqtSignal(float, float, float) # voltage, current, timestamp + error_signal = pyqtSignal(str) + status_signal = pyqtSignal(str) + + def __init__(self, device: object, interval: float = 0.1, start_time: float = None): + """Initialize measurement thread.""" + super().__init__() + self.device = device + self.interval = max(0.05, interval) # Minimum interval + self._running = False + self._mutex = QMutex() # Thread safety + self.filter_window_size = 10 + self.start_time = time.time() + self.last_update_time = self.start_time + + def run(self): + """Main measurement loop with enhanced error handling.""" + self._running = True + voltage_window = deque() + current_window = deque() + + self.status_signal.emit("Measurement started") + + while self._running: + try: + # Read samples with timeout + samples = self.device.read( + self.filter_window_size, + timeout=500, + ) + + if not samples: + raise DeviceDisconnectedError("No samples received - device may be disconnected") + + # Process samples (thread-safe) + with QMutexLocker(self._mutex): + raw_voltage = np.mean([s[1][0] for s in samples]) + raw_current = np.mean([s[0][1] for s in samples]) + + if len(voltage_window) >= self.filter_window_size: + voltage_window.popleft() + current_window.popleft() + + voltage_window.append(raw_voltage) + current_window.append(raw_current) + + voltage = np.mean(list(voltage_window)) + current = np.mean(list(current_window)) + current_time = time.time() - self.start_time + + # Emit updates + self.update_signal.emit(voltage, current, current_time) + self.last_update_time = time.time() + + # Dynamic sleep adjustment + elapsed = time.time() - self.last_update_time + sleep_time = max(0.01, self.interval - elapsed) + time.sleep(sleep_time) + + except DeviceDisconnectedError as e: + self.error_signal.emit(f"Device error: {str(e)}") + break + except Exception as e: + self.error_signal.emit(f"Measurement error: {str(e)}") + self.status_signal.emit(f"Error: {str(e)}") + break + + self.status_signal.emit("Measurement stopped") + self._running = False + + def stop(self): + """Safe thread termination with timeout.""" + self._running = False + if self.isRunning(): + self.quit() + if not self.wait(300): # 300ms grace period + self.terminate() + + def is_active(self) -> bool: + """Check if thread is running and updating.""" + with QMutexLocker(self._mutex): + return self._running and (time.time() - self.last_update_time < 2.0) + + +class TestSequenceThread(QThread): + """Thread for executing battery test sequences.""" + + progress_updated = pyqtSignal(float, str) # progress, phase + cycle_completed = pyqtSignal(int, float, float, float) # cycle, discharge, charge, efficiency + error_occurred = pyqtSignal(str) + + def __init__(self, parent: QObject): + """Initialize test sequence thread. + + Args: + parent: Reference to main BatteryTester instance + """ + super().__init__() + self.parent = parent + self._mutex = QMutex() + self._running = False + + def run(self): + """Execute the complete test sequence.""" + self._running = True + + try: + test_current = float(self.parent.c_rate_input.text()) * float(self.parent.capacity_input.text()) + charge_cutoff = float(self.parent.charge_cutoff_input.text()) + discharge_cutoff = float(self.parent.discharge_cutoff_input.text()) + + while self._running and (self.parent.continuous_mode or self.parent.cycle_count == 0): + with QMutexLocker(self._mutex): + if self.parent.request_stop: + break + + self.parent.cycle_count += 1 + cycle = self.parent.cycle_count + + # Charge phase + self._execute_phase("charge", test_current, charge_cutoff, discharge_cutoff, charge_cutoff) + if not self._running: + break + + # Rest after charge + self._execute_rest("post-charge") + if not self._running: + break + + # Discharge phase + self._execute_phase("discharge", test_current, discharge_cutoff, discharge_cutoff, charge_cutoff) + if not self.parent.continuous_mode: + break + + # Rest after discharge + if self._running: + self._execute_rest("post-discharge") + + # Calculate efficiency + if self.parent.charge_capacity > 0: + efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100 + self.cycle_completed.emit(cycle, self.parent.capacity_ah, + self.parent.charge_capacity, efficiency) + + except Exception as e: + self.error_occurred.emit(str(e)) + finally: + self._running = False + + def _execute_phase(self, phase: str, current: float, target_voltage: float, discharge_cutoff: float, charge_cutoff: float): + """Execute charge/discharge phase. + + Args: + phase: Either 'charge' or 'discharge' + current: Current in amps + target_voltage: Target voltage in volts + """ + try: + if not hasattr(self.parent, 'dev') or not self.parent.session_active: + raise DeviceDisconnectedError("Device not connected") + + # Configure device + self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z + time.sleep(0.1) + + if phase == "charge": + self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV + self.parent.dev.channels['A'].constant(current) + self.progress_updated.emit(0.0, "Charging") + else: + self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV + self.parent.dev.channels['A'].constant(-current) + self.progress_updated.emit(0.0, "Discharging") + + time.sleep(0.1) + start_time = time.time() + last_update = start_time + + while self._running: + with QMutexLocker(self._mutex): + if self.parent.request_stop: + break + + if not self.parent.voltage_data: + time.sleep(0.1) + continue + + current_voltage = self.parent.voltage_data[-1] + current_time = time.time() + delta_t = current_time - last_update + last_update = current_time + + # Update capacity + if phase == "charge": + self.parent.charge_capacity += abs(self.parent.current_data[-1]) * delta_t / 3600 + progress = (current_voltage - discharge_cutoff) / (target_voltage - discharge_cutoff) + else: + self.parent.capacity_ah += abs(self.parent.current_data[-1]) * delta_t / 3600 + progress = (charge_cutoff - current_voltage) / (charge_cutoff - target_voltage) + + progress = max(0.0, min(1.0, progress)) + self.progress_updated.emit(progress, f"{phase.capitalize()}ing") + + # Check termination conditions + if ((phase == "charge" and current_voltage >= target_voltage) or + (phase == "discharge" and current_voltage <= target_voltage)): + break + + time.sleep(0.1) + + except Exception as e: + self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}") + raise + + def _execute_rest(self, phase: str): + """Execute rest phase. + + Args: + phase: Description of rest phase + """ + try: + self.progress_updated.emit(0.0, f"Resting ({phase})") + self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.parent.dev.channels['A'].constant(0) + + rest_time = float(self.parent.rest_time_input.text()) * 3600 + rest_end = time.time() + rest_time + + while time.time() < rest_end and self._running: + with QMutexLocker(self._mutex): + if self.parent.request_stop: + break + + progress = 1 - (rest_end - time.time()) / rest_time + self.progress_updated.emit(progress, f"Resting ({phase})") + time.sleep(1) + + except Exception as e: + self.error_occurred.emit(f"Rest phase error: {str(e)}") + raise + + def stop(self): + """Safely stop the test sequence.""" + with QMutexLocker(self._mutex): + self._running = False + self.wait(500) # Wait up to 500ms for clean exit + + +class BatteryTester(QMainWindow): + """Main application window for battery capacity testing.""" + + error_signal = pyqtSignal(str) + + def __init__(self): + """Initialize the battery tester application.""" + super().__init__() + self.error_signal.connect(self.handle_device_error) + + # Initialize data buffers + self.time_data = deque() + self.voltage_data = deque() + self.current_data = deque() + self.phase_data = deque() + + # Test variables + self.test_phase = "Ready" + self.capacity_ah = 0.0 + self.charge_capacity = 0.0 + self.coulomb_efficiency = 0.0 + self.cycle_count = 0 + + # Color scheme + self.bg_color = QColor(46, 52, 64) + self.fg_color = QColor(216, 222, 233) + self.accent_color = QColor(94, 129, 172) + self.warning_color = QColor(191, 97, 106) + self.success_color = QColor(163, 190, 140) + + # Device status + self.session_active = False + self.measuring = False + self.test_running = False + self.continuous_mode = False + self.request_stop = False + self.interval = 0.1 + self.log_dir = os.path.expanduser("~/adalm1000/logs") + os.makedirs(self.log_dir, exist_ok=True) + + # Thread management + self.measurement_thread = None + self.test_thread = None + + # Initialize UI + self._setup_ui() + + # Initialize device with delay + QTimer.singleShot(100, self.safe_init_device) + + def _setup_ui(self): + """Configure the user interface.""" + self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)") + self.resize(1000, 800) + self.setMinimumSize(800, 700) + + # Set color palette + palette = self.palette() + palette.setColor(QPalette.Window, self.bg_color) + palette.setColor(QPalette.WindowText, self.fg_color) + palette.setColor(QPalette.Base, QColor(59, 66, 82)) + palette.setColor(QPalette.Text, self.fg_color) + palette.setColor(QPalette.Button, self.accent_color) + palette.setColor(QPalette.ButtonText, self.fg_color) + self.setPalette(palette) + + # Main widget and layout + self.central_widget = QWidget() + self.setCentralWidget(self.central_widget) + self.main_layout = QVBoxLayout(self.central_widget) + self.main_layout.setContentsMargins(10, 10, 10, 10) + self.main_layout.setSpacing(10) + + # Header section + header_frame = QWidget() + header_layout = QHBoxLayout(header_frame) + header_layout.setContentsMargins(0, 0, 0, 0) + + self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)") + self.title_label.setStyleSheet(f"font-size: 16px; font-weight: bold; color: {self.accent_color.name()};") + header_layout.addWidget(self.title_label, 1) + + # Connection indicator + self.connection_label = QLabel("Disconnected") + header_layout.addWidget(self.connection_label) + + self.status_light = QLabel() + self.status_light.setFixedSize(20, 20) + self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") + header_layout.addWidget(self.status_light) + + # Reconnect button + self.reconnect_btn = QPushButton("Connect") + self.reconnect_btn.clicked.connect(self.reconnect_device) + header_layout.addWidget(self.reconnect_btn) + + self.main_layout.addWidget(header_frame) + + # Measurement display + display_frame = QFrame() + display_frame.setFrameShape(QFrame.StyledPanel) + display_layout = QGridLayout(display_frame) + display_layout.setHorizontalSpacing(15) + display_layout.setVerticalSpacing(8) + + measurement_labels = [ + ("Voltage", "V"), + ("Current", "A"), + ("Test Phase", ""), + ("Elapsed Time", "s"), + ("Discharge Capacity", "Ah"), + ("Charge Capacity", "Ah"), + ("Coulomb Efficiency", "%"), + ("Cycle Count", ""), + ] + + self.value_labels = {} # Optional: Store labels for easy update + + for i, (label, unit) in enumerate(measurement_labels): + row = i // 2 + col = (i % 2) * 3 # Each block is 3 columns: label | value | unit + + name_label = QLabel(label + ":") + name_label.setStyleSheet("font-size: 11px;") + display_layout.addWidget(name_label, row, col) + + value_label = QLabel("0.000") + value_label.setStyleSheet("font-size: 12px; font-weight: bold; min-width: 60px;") + display_layout.addWidget(value_label, row, col + 1) + + unit_label = QLabel(unit) + unit_label.setStyleSheet("font-size: 11px; color: gray;") + display_layout.addWidget(unit_label, row, col + 2) + + # Save reference for updating later + self.value_labels[label] = value_label + + # Assign to instance attributes for specific fields + if label == "Voltage": + self.voltage_label = value_label + elif label == "Current": + self.current_label = value_label + elif label == "Test Phase": + self.phase_label = value_label + elif label == "Elapsed Time": + self.time_label = value_label + elif label == "Discharge Capacity": + self.capacity_label = value_label + elif label == "Charge Capacity": + self.charge_capacity_label = value_label + elif label == "Coulomb Efficiency": + self.efficiency_label = value_label + elif label == "Cycle Count": + self.cycle_label = value_label + + self.main_layout.addWidget(display_frame) + + # Progress bar + self.progress_bar = QProgressBar() + self.progress_bar.setRange(0, 100) + self.progress_bar.setTextVisible(False) + self.progress_bar.setStyleSheet(f""" + QProgressBar {{ + border: 1px solid {self.fg_color.name()}; + border-radius: 5px; + text-align: center; + }} + QProgressBar::chunk {{ + background-color: {self.accent_color.name()}; + }} + """) + self.main_layout.addWidget(self.progress_bar) + + # Control section + controls_frame = QWidget() + controls_layout = QHBoxLayout(controls_frame) + controls_layout.setContentsMargins(0, 0, 0, 0) + + # Parameters frame + params_frame = QFrame() + params_frame.setFrameShape(QFrame.StyledPanel) + params_layout = QGridLayout(params_frame) + + # Battery capacity + params_layout.addWidget(QLabel("Battery Capacity (Ah):"), 0, 0) + self.capacity_input = QLineEdit("0.2") + self.capacity_input.setFixedWidth(80) + self.capacity_input.setToolTip("Nominal capacity of the battery in Amp-hours") + params_layout.addWidget(self.capacity_input, 0, 1) + + # Charge cutoff voltage + params_layout.addWidget(QLabel("Charge Cutoff (V):"), 1, 0) + self.charge_cutoff_input = QLineEdit("1.43") + self.charge_cutoff_input.setFixedWidth(80) + self.charge_cutoff_input.setToolTip("Voltage at which charging should stop") + params_layout.addWidget(self.charge_cutoff_input, 1, 1) + + # Discharge cutoff voltage + params_layout.addWidget(QLabel("Discharge Cutoff (V):"), 2, 0) + self.discharge_cutoff_input = QLineEdit("0.9") + self.discharge_cutoff_input.setFixedWidth(80) + self.discharge_cutoff_input.setToolTip("Voltage at which discharging should stop") + params_layout.addWidget(self.discharge_cutoff_input, 2, 1) + + # Rest time + params_layout.addWidget(QLabel("Rest Time (hours):"), 3, 0) + self.rest_time_input = QLineEdit("0.25") + self.rest_time_input.setFixedWidth(80) + self.rest_time_input.setToolTip("Rest period between charge/discharge cycles") + params_layout.addWidget(self.rest_time_input, 3, 1) + + # C-Rate for test + params_layout.addWidget(QLabel("Test C-Rate:"), 0, 2) + self.c_rate_input = QLineEdit("0.1") + self.c_rate_input.setFixedWidth(60) + self.c_rate_input.setToolTip("Charge/discharge rate relative to battery capacity (e.g., 0.2 for C/5)") + params_layout.addWidget(self.c_rate_input, 0, 3) + params_layout.addWidget(QLabel("(e.g., 0.2 for C/5)"), 0, 4) + + controls_layout.addWidget(params_frame, 1) + + # Button area + button_frame = QWidget() + button_layout = QVBoxLayout(button_frame) + button_layout.setContentsMargins(0, 0, 0, 0) + + self.start_button = QPushButton("START TEST") + self.start_button.clicked.connect(self.start_test) + self.start_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.accent_color.name()}; + font-weight: bold; + padding: 8px; + border-radius: 5px; + }} + QPushButton:disabled {{ + background-color: #4C566A; + }} + """) + self.start_button.setEnabled(False) + button_layout.addWidget(self.start_button) + + self.stop_button = QPushButton("STOP TEST") + self.stop_button.clicked.connect(self.stop_test) + self.stop_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.warning_color.name()}; + font-weight: bold; + padding: 8px; + border-radius: 5px; + }} + QPushButton:disabled {{ + background-color: #4C566A; + }} + """) + self.stop_button.setEnabled(False) + button_layout.addWidget(self.stop_button) + + # Continuous mode checkbox + self.continuous_check = QCheckBox("Continuous Mode") + self.continuous_check.setChecked(True) + self.continuous_check.setToolTip("Run multiple charge/discharge cycles until stopped") + button_layout.addWidget(self.continuous_check) + + controls_layout.addWidget(button_frame) + self.main_layout.addWidget(controls_frame) + + # Plot area + self._setup_plot() + self.main_layout.addWidget(self.plot_widget, 1) + + # Status bar + self.status_bar = QLabel("Ready") + self.status_bar.setStyleSheet("font-size: 10px; padding: 5px;") + self.main_layout.addWidget(self.status_bar) + + def _setup_plot(self): + """Configure the matplotlib plot.""" + self.plot_widget = QWidget() + plot_layout = QVBoxLayout(self.plot_widget) + + self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440') + self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) + self.ax = self.fig.add_subplot(111) + self.ax.set_facecolor('#3B4252') + + # Initial voltage range + voltage_padding = 0.2 + min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) + max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding + self.ax.set_ylim(min_voltage, max_voltage) + + # Voltage plot + self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) + self.ax.set_ylabel("Voltage (V)", color='#00BFFF') + self.ax.tick_params(axis='y', labelcolor='#00BFFF') + + # Current plot (right axis) + self.ax2 = self.ax.twinx() + current_padding = 0.05 + test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) + max_current = test_current * 1.5 + self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) + + self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) + self.ax2.set_ylabel("Current (A)", color='r') + self.ax2.tick_params(axis='y', labelcolor='r') + + self.ax.set_xlabel('Time (s)', color=self.fg_color.name()) + self.ax.set_title('Battery Test (CC)', color=self.fg_color.name()) + self.ax.tick_params(axis='x', colors=self.fg_color.name()) + self.ax.grid(True, color='#4C566A') + + # Position legends + self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) + self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) + + # Embed plot + self.canvas = FigureCanvas(self.fig) + plot_layout.addWidget(self.canvas) + + def safe_init_device(self): + """Safe device initialization with error handling.""" + try: + self.init_device() + except Exception as e: + self.handle_device_error(str(e)) + + def init_device(self): + """Initialize the ADALM1000 device.""" + # Temporarily enable USB debugging + os.environ['LIBUSB_DEBUG'] = '3' # Set to 0 in production + self.cleanup_device() + + try: + print("Waiting before initializing session...") + time.sleep(1.5) # Delay helps avoid "device busy" issues + + self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) + + # 🔍 Log detected devices + print(f"Devices found: {self.session.devices}") + + if not self.session.devices: + raise Exception("No ADALM1000 detected - check USB connection") + + self.dev = self.session.devices[0] + + # Reset channels + self.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.dev.channels['B'].mode = pysmu.Mode.HI_Z + self.dev.channels['A'].constant(0) + self.dev.channels['B'].constant(0) + + self.session.start(0) + + # Update UI + self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") + self.connection_label.setText("Connected") + self.status_bar.setText("Device connected | Ready for measurement") + self.session_active = True + self.start_button.setEnabled(True) + + # Start measurement thread + self.start_measurement_thread() + + except Exception as e: + raise Exception(f"Device initialization failed: {str(e)}") + + def cleanup_device(self): + """Clean up device resources.""" + print("Cleaning up device session...") + + # Stop measurement thread + if self.measurement_thread is not None: + try: + self.measurement_thread.stop() + if not self.measurement_thread.wait(1000): + print("Warning: Measurement thread didn't stop cleanly") + self.measurement_thread = None + except Exception as e: + print(f"Error stopping measurement thread: {e}") + + # Stop and delete session + if hasattr(self, 'session'): + try: + if self.session_active: + time.sleep(0.1) + self.session.end() + self.session_active = False + del self.session + print("Session ended successfully") + except Exception as e: + print(f"Error ending session: {e}") + finally: + self.session_active = False + + # Reset UI indicators + self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") + self.connection_label.setText("Disconnected") + self.start_button.setEnabled(False) + self.stop_button.setEnabled(False) + + def start_measurement_thread(self): + """Start the measurement thread.""" + if self.measurement_thread is not None: + self.measurement_thread.stop() + self.measurement_thread.wait(500) + + self.measurement_thread = MeasurementThread( + device=self.dev, + interval=self.interval, + start_time=self.start_time + + self.measurement_thread = MeasurementThread(self.dev, self.interval) + self.measurement_thread.update_signal.connect(self.update_measurements) + self.measurement_thread.error_signal.connect(self.handle_device_error) + self.measurement_thread.start() + + def reconnect_device(self): + """Attempt to reconnect the device.""" + self.status_bar.setText("Attempting to reconnect...") + self.cleanup_device() + QTimer.singleShot(2000, self.safe_init_device) # Retry with delay + + def handle_device_error(self, error_msg): + """Thread-safe device error handling.""" + try: + # Ensure this runs in the main thread + if QThread.currentThread() != self.thread(): + self.error_signal.emit(str(error_msg)) + return + + print(f"Device error: {error_msg}") + + def update_ui(): + self.status_bar.setText(f"Device error: {error_msg}") + if self.isVisible(): + QMessageBox.critical( + self, + "Device Error", + f"Device error occurred:\n{error_msg}\n\n" + "1. Check USB connection\n" + "2. Try manual reconnect\n" + "3. Restart application if problems persist" + ) + + QTimer.singleShot(0, lambda: ( + self.cleanup_device(), + update_ui() + )) + + except Exception as e: + print(f"Error in error handler: {str(e)}") + try: + self.cleanup_device() + except: + pass + + def start_test(self): + """Start the complete battery test cycle.""" + if not self.test_running: + try: + # Get and validate input values + capacity = float(self.capacity_input.text()) + charge_cutoff = float(self.charge_cutoff_input.text()) + discharge_cutoff = float(self.discharge_cutoff_input.text()) + c_rate = float(self.c_rate_input.text()) + + if capacity <= 0: + raise ValueError("Battery capacity must be positive") + if charge_cutoff <= discharge_cutoff: + raise ValueError("Charge cutoff must be higher than discharge cutoff") + if c_rate <= 0: + raise ValueError("C-rate must be positive") + + self.continuous_mode = self.continuous_check.isChecked() + test_current = c_rate * capacity + + if test_current > 0.2: + raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") + + # Reset data + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + self.capacity_ah = 0.0 + self.charge_capacity = 0.0 + self.coulomb_efficiency = 0.0 + self.cycle_count = 0 + self.start_time = time.time() + self.time_label.setText("00:00:00") + self.reset_plot() + + # Prepare log file + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}") + + # Start test + self.test_running = True + self.start_time = time.time() + self.test_phase = "Initial Discharge" + self.phase_label.setText(self.test_phase) + + self.start_button.setEnabled(False) + self.stop_button.setEnabled(True) + self.status_bar.setText(f"Test started | Discharging to {discharge_cutoff}V @ {test_current:.3f}A") + + # Start test sequence in separate thread + self.test_thread = TestSequenceThread(self) + self.test_thread.progress_updated.connect(self.update_test_progress) + self.test_thread.cycle_completed.connect(self.update_cycle_stats) + self.test_thread.error_occurred.connect(self.handle_device_error) + self.test_thread.finished.connect(self.finalize_test) + self.test_thread.start() + + except Exception as e: + QMessageBox.critical(self, "Error", str(e)) + + def update_test_progress(self, progress: float, phase: str): + """Update test progress and phase display.""" + self.test_phase = phase + self.phase_label.setText(phase) + self.progress_bar.setValue(int(progress * 100)) + + def update_cycle_stats(self, cycle: int, discharge: float, charge: float, efficiency: float): + """Update cycle statistics.""" + self.cycle_count = cycle + self.capacity_ah = discharge + self.charge_capacity = charge + self.coulomb_efficiency = efficiency + + self.cycle_label.setText(f"{cycle}") + self.capacity_label.setText(f"{discharge:.4f}") + self.charge_capacity_label.setText(f"{charge:.4f}") + self.efficiency_label.setText(f"{efficiency:.1f}") + + self.status_bar.setText( + f"Cycle {cycle} completed | " + f"Discharge: {discharge:.3f}Ah | " + f"Charge: {charge:.3f}Ah | " + f"Efficiency: {efficiency:.1f}%" + ) + + # Write cycle summary + self.write_cycle_summary() + + def stop_test(self): + """Safely stop the running test.""" + if not self.test_running: + return + + self.request_stop = True + self.test_running = False + self.measuring = False + self.test_phase = "Ready" + self.phase_label.setText(self.test_phase) + + if hasattr(self, 'dev'): + try: + self.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.dev.channels['A'].constant(0) + except Exception as e: + print(f"Error resetting device: {e}") + + # Update UI + self.status_bar.setText("Test stopped - Ready for new test") + self.stop_button.setEnabled(False) + self.start_button.setEnabled(True) + + self.finalize_test(show_message=False) + + def finalize_test(self, show_message: bool = True): + """Final cleanup after test completion.""" + try: + # Write log data + if hasattr(self, 'log_buffer') and self.log_buffer: + try: + with open(self.filename, 'a', newline='') as f: + writer = csv.writer(f) + writer.writerows(self.log_buffer) + self.log_buffer.clear() + except Exception as e: + print(f"Error writing log buffer: {e}") + + # Close log file + if hasattr(self, 'current_cycle_file') and self.current_cycle_file: + try: + self.current_cycle_file.flush() + os.fsync(self.current_cycle_file.fileno()) + self.current_cycle_file.close() + except Exception as e: + print(f"Error closing log file: {e}") + + # Show notification + if show_message: + msg_box = QMessageBox(self) + msg_box.setWindowFlags(msg_box.windowFlags() | + Qt.WindowStaysOnTopHint) + msg_box.setIcon(QMessageBox.Information) + msg_box.setWindowTitle("Test Complete") + msg_box.setText(f"Test completed\nCycles: {self.cycle_count}") + msg_box.exec_() + + except Exception as e: + print(f"Critical error in finalize_test: {e}") + finally: + # Reset test status + self.test_running = False + self.request_stop = True + self.measuring = False + + def update_measurements(self, voltage: float, current: float, current_time: float): + """Update measurements in the UI.""" + if len(self.time_data) > 10000: # Limit data points + self.time_data.popleft() + self.voltage_data.popleft() + self.current_data.popleft() + + self.time_data.append(current_time) + self.voltage_data.append(voltage) + self.current_data.append(current) + + # Update UI + self.voltage_label.setText(f"{voltage:.4f}") + self.current_label.setText(f"{current:.4f}") + self.time_label.setText(self.format_time(current_time)) + + # Update plot periodically + if len(self.time_data) % 10 == 0: + self.update_plot() + + # Log data if test is running + if self.test_running and hasattr(self, 'current_cycle_file'): + self.log_buffer.append([ + f"{current_time:.3f}", + f"{voltage:.6f}", + f"{current:.6f}", + self.test_phase, + f"{self.capacity_ah:.4f}", + f"{self.charge_capacity:.4f}", + f"{self.coulomb_efficiency:.1f}", + f"{self.cycle_count}" + ]) + + # Write data in blocks + if len(self.log_buffer) >= 10: + with open(self.filename, 'a', newline='') as f: + writer = csv.writer(f) + writer.writerows(self.log_buffer) + self.log_buffer.clear() + + def update_plot(self): + """Update the plot with new data.""" + if not self.time_data: + return + + self.line_voltage.set_data(list(self.time_data), list(self.voltage_data)) + self.line_current.set_data(list(self.time_data), list(self.current_data)) + self.auto_scale_axes() + self.canvas.draw_idle() + + def auto_scale_axes(self): + """Automatically adjust plot axes.""" + if not self.time_data: + return + + # X-axis adjustment + max_time = list(self.time_data)[-1] + current_xlim = self.ax.get_xlim() + if max_time > current_xlim[1] * 0.95: + new_xmax = max_time * 1.10 # 10% padding + self.ax.set_xlim(0, new_xmax) + self.ax2.set_xlim(0, new_xmax) + + # Y-axes adjustment + if self.voltage_data: + voltage_padding = 0.2 + min_v = max(0, min(list(self.voltage_data)) - voltage_padding) + max_v = min(5.0, max(list(self.voltage_data)) + voltage_padding) + current_ylim = self.ax.get_ylim() + new_min = current_ylim[0] + (min_v - current_ylim[0]) * 0.1 + new_max = current_ylim[1] + (max_v - current_ylim[1]) * 0.1 + self.ax.set_ylim(new_min, new_max) + + if self.current_data: + current_padding = 0.05 + min_c = max(-0.25, min(list(self.current_data)) - current_padding) + max_c = min(0.25, max(list(self.current_data)) + current_padding) + current_ylim = self.ax2.get_ylim() + new_min = current_ylim[0] + (min_c - current_ylim[0]) * 0.1 + new_max = current_ylim[1] + (max_c - current_ylim[1]) * 0.1 + self.ax2.set_ylim(new_min, new_max) + + @staticmethod + def format_time(seconds: float) -> str: + """Format seconds as HH:MM:SS. + + Args: + seconds: Time in seconds + + Returns: + Formatted time string + """ + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + seconds = int(seconds % 60) + return f"{hours:02d}:{minutes:02d}:{seconds:02d}" + + def reset_plot(self): + """Reset the plot to initial state.""" + self.line_voltage.set_data([], []) + self.line_current.set_data([], []) + + # Reset axes to starting values + self.ax.set_xlim(0, 10) + voltage_padding = 0.2 + min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) + max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding + self.ax.set_ylim(min_voltage, max_voltage) + + self.canvas.draw() + + def closeEvent(self, event): + """Clean up when closing the window.""" + self.cleanup_device() + event.accept() + + +if __name__ == "__main__": + app = QApplication([]) + app.setStyle('Fusion') + window = BatteryTester() + window.show() + app.exec_() \ No newline at end of file