diff --git a/MainCode/adalm1000_logger.py b/MainCode/adalm1000_logger.py index 1b3092e..72c1bfc 100644 --- a/MainCode/adalm1000_logger.py +++ b/MainCode/adalm1000_logger.py @@ -2,19 +2,18 @@ import os import time import csv -import threading from datetime import datetime import numpy as np -from collections import deque -# Warnungen unterdrücken +# Suppress warnings os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false' os.environ['LIBUSB_DEBUG'] = '0' from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QPushButton, QLineEdit, QFrame, - QCheckBox, QMessageBox, QFileDialog) -from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread + QCheckBox, QMessageBox, QFileDialog, QProgressBar) +from PyQt5.QtCore import (Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread, + QMutex, QMutexLocker, QQueue) from PyQt5.QtGui import QColor, QPalette import pysmu @@ -23,35 +22,46 @@ matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure + class DeviceDisconnectedError(Exception): + """Custom exception for device disconnection events.""" pass + class MeasurementThread(QThread): + """Thread for continuous measurement of voltage and current.""" + update_signal = pyqtSignal(float, float, float) # voltage, current, timestamp error_signal = pyqtSignal(str) - status_signal = pyqtSignal(str) # New: for status updates + status_signal = pyqtSignal(str) - def __init__(self, device, interval=0.1): + def __init__(self, device: object, interval: float = 0.1): + """Initialize measurement thread. + + Args: + device: ADALM1000 device object + interval: Measurement interval in seconds + """ super().__init__() self.device = device - self.interval = max(0.05, interval) # Ensure minimum interval + self.interval = max(0.05, interval) # Minimum interval self._running = False - self._lock = threading.Lock() # Thread safety + self._mutex = QMutex() # Thread safety self.filter_window_size = 10 self.start_time = time.time() self.last_update_time = self.start_time def run(self): - """Main measurement loop with enhanced error handling""" + """Main measurement loop with enhanced error handling.""" self._running = True - voltage_window = deque(maxlen=self.filter_window_size) - current_window = deque(maxlen=self.filter_window_size) + voltage_window = QQueue() + current_window = QQueue() self.status_signal.emit("Measurement started") while self._running: try: - # 1. Read samples with timeout + # Read samples with timeout samples = self.device.read( self.filter_window_size, timeout=500, @@ -60,23 +70,27 @@ class MeasurementThread(QThread): if not samples: raise DeviceDisconnectedError("No samples received - device may be disconnected") - # 2. Process samples (thread-safe) - with self._lock: + # Process samples (thread-safe) + with QMutexLocker(self._mutex): raw_voltage = np.mean([s[1][0] for s in samples]) raw_current = np.mean([s[0][1] for s in samples]) - voltage_window.append(raw_voltage) - current_window.append(raw_current) + if voltage_window.size() >= self.filter_window_size: + voltage_window.dequeue() + current_window.dequeue() - voltage = np.mean(voltage_window) - current = np.mean(current_window) + voltage_window.enqueue(raw_voltage) + current_window.enqueue(raw_current) + + voltage = np.mean(list(voltage_window)) + current = np.mean(list(current_window)) current_time = time.time() - self.start_time - # 3. Emit updates + # Emit updates self.update_signal.emit(voltage, current, current_time) self.last_update_time = time.time() - # 4. Dynamic sleep adjustment + # Dynamic sleep adjustment elapsed = time.time() - self.last_update_time sleep_time = max(0.01, self.interval - elapsed) time.sleep(sleep_time) @@ -93,43 +107,216 @@ class MeasurementThread(QThread): self._running = False def stop(self): - """Safe thread termination with timeout""" + """Safe thread termination with timeout.""" self._running = False if self.isRunning(): self.quit() if not self.wait(300): # 300ms grace period - self.terminate() # Forceful termination if needed + self.terminate() - def is_active(self): - """Check if thread is running and updating""" - with self._lock: + def is_active(self) -> bool: + """Check if thread is running and updating.""" + with QMutexLocker(self._mutex): return self._running and (time.time() - self.last_update_time < 2.0) -class BatteryTester(QMainWindow): - def __init__(self): + +class TestSequenceThread(QThread): + """Thread for executing battery test sequences.""" + + progress_updated = pyqtSignal(float, str) # progress, phase + cycle_completed = pyqtSignal(int, float, float, float) # cycle, discharge, charge, efficiency + error_occurred = pyqtSignal(str) + + def __init__(self, parent: QObject): + """Initialize test sequence thread. + + Args: + parent: Reference to main BatteryTester instance + """ super().__init__() + self.parent = parent + self._mutex = QMutex() + self._running = False + + def run(self): + """Execute the complete test sequence.""" + self._running = True - # Initialisierung aller Attribute - self.time_data = deque() - self.voltage_data = deque() - self.current_data = deque() - self.phase_data = deque() + try: + test_current = float(self.parent.c_rate_input.text()) * float(self.parent.capacity_input.text()) + charge_cutoff = float(self.parent.charge_cutoff_input.text()) + discharge_cutoff = float(self.parent.discharge_cutoff_input.text()) + + while self._running and (self.parent.continuous_mode or self.parent.cycle_count == 0): + with QMutexLocker(self._mutex): + if self.parent.request_stop: + break + + self.parent.cycle_count += 1 + cycle = self.parent.cycle_count + + # Charge phase + self._execute_phase("charge", test_current, charge_cutoff) + if not self._running: + break + + # Rest after charge + self._execute_rest("post-charge") + if not self._running: + break + + # Discharge phase + self._execute_phase("discharge", test_current, discharge_cutoff) + if not self.parent.continuous_mode: + break + + # Rest after discharge + if self._running: + self._execute_rest("post-discharge") + + # Calculate efficiency + if self.parent.charge_capacity > 0: + efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100 + self.cycle_completed.emit(cycle, self.parent.capacity_ah, + self.parent.charge_capacity, efficiency) + + except Exception as e: + self.error_occurred.emit(str(e)) + finally: + self._running = False + + def _execute_phase(self, phase: str, current: float, target_voltage: float): + """Execute charge/discharge phase. - # Testvariablen - self.test_phase = "Bereit" + Args: + phase: Either 'charge' or 'discharge' + current: Current in amps + target_voltage: Target voltage in volts + """ + try: + if not hasattr(self.parent, 'dev') or not self.parent.session_active: + raise DeviceDisconnectedError("Device not connected") + + # Configure device + self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z + time.sleep(0.1) + + if phase == "charge": + self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV + self.parent.dev.channels['A'].constant(current) + self.progress_updated.emit(0.0, "Charging") + else: + self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV + self.parent.dev.channels['A'].constant(-current) + self.progress_updated.emit(0.0, "Discharging") + + time.sleep(0.1) + start_time = time.time() + last_update = start_time + + while self._running: + with QMutexLocker(self._mutex): + if self.parent.request_stop: + break + + if not self.parent.voltage_data: + time.sleep(0.1) + continue + + current_voltage = self.parent.voltage_data[-1] + current_time = time.time() + delta_t = current_time - last_update + last_update = current_time + + # Update capacity + if phase == "charge": + self.parent.charge_capacity += abs(self.parent.current_data[-1]) * delta_t / 3600 + progress = (current_voltage - self.parent.discharge_cutoff) / \ + (target_voltage - self.parent.discharge_cutoff) + else: + self.parent.capacity_ah += abs(self.parent.current_data[-1]) * delta_t / 3600 + progress = (self.parent.charge_cutoff - current_voltage) / \ + (self.parent.charge_cutoff - target_voltage) + + progress = max(0.0, min(1.0, progress)) + self.progress_updated.emit(progress, f"{phase.capitalize()}ing") + + # Check termination conditions + if ((phase == "charge" and current_voltage >= target_voltage) or + (phase == "discharge" and current_voltage <= target_voltage)): + break + + time.sleep(0.1) + + except Exception as e: + self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}") + raise + + def _execute_rest(self, phase: str): + """Execute rest phase. + + Args: + phase: Description of rest phase + """ + try: + self.progress_updated.emit(0.0, f"Resting ({phase})") + self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.parent.dev.channels['A'].constant(0) + + rest_time = float(self.parent.rest_time_input.text()) * 3600 + rest_end = time.time() + rest_time + + while time.time() < rest_end and self._running: + with QMutexLocker(self._mutex): + if self.parent.request_stop: + break + + progress = 1 - (rest_end - time.time()) / rest_time + self.progress_updated.emit(progress, f"Resting ({phase})") + time.sleep(1) + + except Exception as e: + self.error_occurred.emit(f"Rest phase error: {str(e)}") + raise + + def stop(self): + """Safely stop the test sequence.""" + with QMutexLocker(self._mutex): + self._running = False + self.wait(500) # Wait up to 500ms for clean exit + + +class BatteryTester(QMainWindow): + """Main application window for battery capacity testing.""" + + error_signal = pyqtSignal(str) + + def __init__(self): + """Initialize the battery tester application.""" + super().__init__() + self.error_signal.connect(self.handle_device_error) + + # Initialize data buffers + self.time_data = QQueue() + self.voltage_data = QQueue() + self.current_data = QQueue() + self.phase_data = QQueue() + + # Test variables + self.test_phase = "Ready" self.capacity_ah = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 - # Farbschema + # Color scheme self.bg_color = QColor(46, 52, 64) self.fg_color = QColor(216, 222, 233) self.accent_color = QColor(94, 129, 172) self.warning_color = QColor(191, 97, 106) self.success_color = QColor(163, 190, 140) - # Gerätestatus + # Device status self.session_active = False self.measuring = False self.test_running = False @@ -139,23 +326,23 @@ class BatteryTester(QMainWindow): self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) - # Thread-Management + # Thread management self.measurement_thread = None self.test_thread = None - # UI initialisieren - self.setup_ui() + # Initialize UI + self._setup_ui() - # Gerät verzögert initialisieren + # Initialize device with delay QTimer.singleShot(100, self.safe_init_device) - def setup_ui(self): - """Konfiguriert die Benutzeroberfläche""" - self.setWindowTitle("ADALM1000 - Batteriekapazitätstester (CC-Test)") + def _setup_ui(self): + """Configure the user interface.""" + self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)") self.resize(1000, 800) self.setMinimumSize(800, 700) - # Hintergrundfarbe setzen + # Set color palette palette = self.palette() palette.setColor(QPalette.Window, self.bg_color) palette.setColor(QPalette.WindowText, self.fg_color) @@ -165,24 +352,24 @@ class BatteryTester(QMainWindow): palette.setColor(QPalette.ButtonText, self.fg_color) self.setPalette(palette) - # Hauptwidget und Layout + # Main widget and layout self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.main_layout = QVBoxLayout(self.central_widget) self.main_layout.setContentsMargins(10, 10, 10, 10) self.main_layout.setSpacing(10) - # Kopfbereich + # Header section header_frame = QWidget() header_layout = QHBoxLayout(header_frame) header_layout.setContentsMargins(0, 0, 0, 0) - self.title_label = QLabel("ADALM1000 Batteriekapazitätstester (CC-Test)") + self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)") self.title_label.setStyleSheet(f"font-size: 16px; font-weight: bold; color: {self.accent_color.name()};") header_layout.addWidget(self.title_label, 1) - # Statusindikator - self.connection_label = QLabel("Getrennt") + # Connection indicator + self.connection_label = QLabel("Disconnected") header_layout.addWidget(self.connection_label) self.status_light = QLabel() @@ -190,27 +377,27 @@ class BatteryTester(QMainWindow): self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") header_layout.addWidget(self.status_light) - # Verbinden-Button - self.reconnect_btn = QPushButton("Verbinden") + # Reconnect button + self.reconnect_btn = QPushButton("Connect") self.reconnect_btn.clicked.connect(self.reconnect_device) header_layout.addWidget(self.reconnect_btn) self.main_layout.addWidget(header_frame) - # Messanzeige + # Measurement display display_frame = QFrame() display_frame.setFrameShape(QFrame.StyledPanel) display_layout = QGridLayout(display_frame) measurement_labels = [ - ("Spannung (V)", "V"), - ("Strom (A)", "A"), - ("Testphase", ""), - ("Verstrichene Zeit", "s"), - ("Entladekapazität", "Ah"), - ("Ladekapazität", "Ah"), - ("Coulomb-Eff.", "%"), - ("Zykluszählung", ""), + ("Voltage (V)", "V"), + ("Current (A)", "A"), + ("Test Phase", ""), + ("Elapsed Time", "s"), + ("Discharge Capacity", "Ah"), + ("Charge Capacity", "Ah"), + ("Coulomb Eff.", "%"), + ("Cycle Count", ""), ] for i, (label, unit) in enumerate(measurement_labels): @@ -249,85 +436,127 @@ class BatteryTester(QMainWindow): self.main_layout.addWidget(display_frame) - # Steuerbereich + # Progress bar + self.progress_bar = QProgressBar() + self.progress_bar.setRange(0, 100) + self.progress_bar.setTextVisible(False) + self.progress_bar.setStyleSheet(f""" + QProgressBar {{ + border: 1px solid {self.fg_color.name()}; + border-radius: 5px; + text-align: center; + }} + QProgressBar::chunk {{ + background-color: {self.accent_color.name()}; + }} + """) + self.main_layout.addWidget(self.progress_bar) + + # Control section controls_frame = QWidget() controls_layout = QHBoxLayout(controls_frame) controls_layout.setContentsMargins(0, 0, 0, 0) - # Parameterrahmen + # Parameters frame params_frame = QFrame() params_frame.setFrameShape(QFrame.StyledPanel) params_layout = QGridLayout(params_frame) - # Batteriekapazität - params_layout.addWidget(QLabel("Batteriekapazität (Ah):"), 0, 0) + # Battery capacity + params_layout.addWidget(QLabel("Battery Capacity (Ah):"), 0, 0) self.capacity_input = QLineEdit("0.2") self.capacity_input.setFixedWidth(80) + self.capacity_input.setToolTip("Nominal capacity of the battery in Amp-hours") params_layout.addWidget(self.capacity_input, 0, 1) - # Lade-Endspannung - params_layout.addWidget(QLabel("Lade-Endspannung (V):"), 1, 0) + # Charge cutoff voltage + params_layout.addWidget(QLabel("Charge Cutoff (V):"), 1, 0) self.charge_cutoff_input = QLineEdit("1.43") self.charge_cutoff_input.setFixedWidth(80) + self.charge_cutoff_input.setToolTip("Voltage at which charging should stop") params_layout.addWidget(self.charge_cutoff_input, 1, 1) - # Entlade-Endspannung - params_layout.addWidget(QLabel("Entlade-Endspannung (V):"), 2, 0) + # Discharge cutoff voltage + params_layout.addWidget(QLabel("Discharge Cutoff (V):"), 2, 0) self.discharge_cutoff_input = QLineEdit("0.9") self.discharge_cutoff_input.setFixedWidth(80) + self.discharge_cutoff_input.setToolTip("Voltage at which discharging should stop") params_layout.addWidget(self.discharge_cutoff_input, 2, 1) - # Ruhezeit - params_layout.addWidget(QLabel("Ruhezeit (Stunden):"), 3, 0) + # Rest time + params_layout.addWidget(QLabel("Rest Time (hours):"), 3, 0) self.rest_time_input = QLineEdit("0.25") self.rest_time_input.setFixedWidth(80) + self.rest_time_input.setToolTip("Rest period between charge/discharge cycles") params_layout.addWidget(self.rest_time_input, 3, 1) - # C-Rate für Test - params_layout.addWidget(QLabel("Test-C-Rate:"), 0, 2) + # C-Rate for test + params_layout.addWidget(QLabel("Test C-Rate:"), 0, 2) self.c_rate_input = QLineEdit("0.1") self.c_rate_input.setFixedWidth(60) + self.c_rate_input.setToolTip("Charge/discharge rate relative to battery capacity (e.g., 0.2 for C/5)") params_layout.addWidget(self.c_rate_input, 0, 3) - params_layout.addWidget(QLabel("(z.B. 0.2 für C/5)"), 0, 4) + params_layout.addWidget(QLabel("(e.g., 0.2 for C/5)"), 0, 4) controls_layout.addWidget(params_frame, 1) - # Button-Bereich + # Button area button_frame = QWidget() button_layout = QVBoxLayout(button_frame) button_layout.setContentsMargins(0, 0, 0, 0) - self.start_button = QPushButton("TEST STARTEN") + self.start_button = QPushButton("START TEST") self.start_button.clicked.connect(self.start_test) - self.start_button.setStyleSheet(f"background-color: {self.accent_color.name()}; font-weight: bold;") + self.start_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.accent_color.name()}; + font-weight: bold; + padding: 8px; + border-radius: 5px; + }} + QPushButton:disabled {{ + background-color: #4C566A; + }} + """) self.start_button.setEnabled(False) button_layout.addWidget(self.start_button) - self.stop_button = QPushButton("TEST STOPPEN") + self.stop_button = QPushButton("STOP TEST") self.stop_button.clicked.connect(self.stop_test) - self.stop_button.setStyleSheet(f"background-color: {self.warning_color.name()}; font-weight: bold;") + self.stop_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.warning_color.name()}; + font-weight: bold; + padding: 8px; + border-radius: 5px; + }} + QPushButton:disabled {{ + background-color: #4C566A; + }} + """) self.stop_button.setEnabled(False) button_layout.addWidget(self.stop_button) - # Kontinuierlicher Modus - self.continuous_check = QCheckBox("Kontinuierlicher Modus") + # Continuous mode checkbox + self.continuous_check = QCheckBox("Continuous Mode") self.continuous_check.setChecked(True) + self.continuous_check.setToolTip("Run multiple charge/discharge cycles until stopped") button_layout.addWidget(self.continuous_check) controls_layout.addWidget(button_frame) self.main_layout.addWidget(controls_frame) - # Plotbereich - self.setup_plot() + # Plot area + self._setup_plot() self.main_layout.addWidget(self.plot_widget, 1) - # Statusleiste - self.status_bar = QLabel("Bereit") + # Status bar + self.status_bar = QLabel("Ready") self.status_bar.setStyleSheet("font-size: 10px; padding: 5px;") self.main_layout.addWidget(self.status_bar) - def setup_plot(self): - """Konfiguriert den Matplotlib-Plot""" + def _setup_plot(self): + """Configure the matplotlib plot.""" self.plot_widget = QWidget() plot_layout = QVBoxLayout(self.plot_widget) @@ -336,64 +565,64 @@ class BatteryTester(QMainWindow): self.ax = self.fig.add_subplot(111) self.ax.set_facecolor('#3B4252') - # Initiale Spannungsbereich + # Initial voltage range voltage_padding = 0.2 min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) - # Spannungsplot - self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Spannung (V)', linewidth=2) - self.ax.set_ylabel("Spannung (V)", color='#00BFFF') + # Voltage plot + self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) + self.ax.set_ylabel("Voltage (V)", color='#00BFFF') self.ax.tick_params(axis='y', labelcolor='#00BFFF') - # Stromplot (rechte Achse) + # Current plot (right axis) self.ax2 = self.ax.twinx() current_padding = 0.05 test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) max_current = test_current * 1.5 self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) - self.line_current, = self.ax2.plot([], [], 'r-', label='Strom (A)', linewidth=2) - self.ax2.set_ylabel("Strom (A)", color='r') + self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) + self.ax2.set_ylabel("Current (A)", color='r') self.ax2.tick_params(axis='y', labelcolor='r') - self.ax.set_xlabel('Zeit (s)', color=self.fg_color.name()) - self.ax.set_title('Batterietest (CC)', color=self.fg_color.name()) + self.ax.set_xlabel('Time (s)', color=self.fg_color.name()) + self.ax.set_title('Battery Test (CC)', color=self.fg_color.name()) self.ax.tick_params(axis='x', colors=self.fg_color.name()) self.ax.grid(True, color='#4C566A') - # Legenden positionieren + # Position legends self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) - # Plot einbetten + # Embed plot self.canvas = FigureCanvas(self.fig) plot_layout.addWidget(self.canvas) def safe_init_device(self): - """Sichere Geräteinitialisierung mit Fehlerbehandlung""" + """Safe device initialization with error handling.""" try: self.init_device() except Exception as e: self.handle_device_error(str(e)) def init_device(self): - """Initialisiert das ADALM1000-Gerät""" + """Initialize the ADALM1000 device.""" # Temporarily enable USB debugging os.environ['LIBUSB_DEBUG'] = '3' # Set to 0 in production self.cleanup_device() try: - # Verzögerung zur Vermeidung von "Device busy" Fehlern + # Delay to avoid "Device busy" errors time.sleep(1.5) self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) if not self.session.devices: - raise Exception("Kein ADALM1000 erkannt - Verbindung prüfen") + raise Exception("No ADALM1000 detected - check connection") self.dev = self.session.devices[0] - # Kanäle zurücksetzen + # Reset channels self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) @@ -402,52 +631,50 @@ class BatteryTester(QMainWindow): self.session.start(0) self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") - self.connection_label.setText("Verbunden") - self.status_bar.setText("Gerät verbunden | Bereit zur Messung") + self.connection_label.setText("Connected") + self.status_bar.setText("Device connected | Ready for measurement") self.session_active = True self.start_button.setEnabled(True) - # Mess-Thread starten + # Start measurement thread self.start_measurement_thread() except Exception as e: - raise Exception(f"Geräteinitialisierung fehlgeschlagen: {str(e)}") + raise Exception(f"Device initialization failed: {str(e)}") def cleanup_device(self): - """Bereinigt Geräteressourcen""" - # Mess-Thread stoppen + """Clean up device resources.""" + # Stop measurement thread if self.measurement_thread is not None: try: self.measurement_thread.stop() - # Wait for thread to finish if not self.measurement_thread.wait(1000): # 1 second timeout print("Warning: Measurement thread didn't stop cleanly") self.measurement_thread = None except Exception as e: - print(f"Fehler beim Stoppen des Mess-Threads: {e}") + print(f"Error stopping measurement thread: {e}") - # Sitzung bereinigen + # Clean up session if hasattr(self, 'session'): try: if self.session_active: - # Add small delay before ending session time.sleep(0.1) self.session.end() self.session_active = False del self.session except Exception as e: - print(f"Fehler beim Bereinigen der Sitzung: {e}") + print(f"Error cleaning up session: {e}") finally: self.session_active = False - # UI-Status zurücksetzen + # Reset UI status self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") - self.connection_label.setText("Getrennt") + self.connection_label.setText("Disconnected") self.start_button.setEnabled(False) self.stop_button.setEnabled(False) def start_measurement_thread(self): - """Startet den Mess-Thread""" + """Start the measurement thread.""" if self.measurement_thread is not None: self.measurement_thread.stop() @@ -457,52 +684,69 @@ class BatteryTester(QMainWindow): self.measurement_thread.start() def reconnect_device(self): - """Versucht, das Gerät erneut zu verbinden""" - self.status_bar.setText("Versuche erneut zu verbinden...") + """Attempt to reconnect the device.""" + self.status_bar.setText("Attempting to reconnect...") self.cleanup_device() - QTimer.singleShot(2000, self.safe_init_device) # Mit Verzögerung erneut versuchen + QTimer.singleShot(2000, self.safe_init_device) # Retry with delay def handle_device_error(self, error_msg): - """Behandelt Gerätefehler""" - print(f"Gerätefehler: {error_msg}") - self.status_bar.setText(f"Gerätefehler: {error_msg}") - self.cleanup_device() - - # Nur Meldung anzeigen, wenn Fenster sichtbar ist - if self.isVisible(): - QMessageBox.critical( - self, - "Gerätefehler", - f"Gerätefehler aufgetreten:\n{error_msg}\n\n" - "1. USB-Verbindung prüfen\n" - "2. Manuell erneut verbinden\n" - "3. Anwendung neu starten bei anhaltenden Problemen" - ) + """Thread-safe device error handling.""" + try: + # Ensure this runs in the main thread + if QThread.currentThread() != self.thread(): + self.error_signal.emit(str(error_msg)) + return + + print(f"Device error: {error_msg}") + + def update_ui(): + self.status_bar.setText(f"Device error: {error_msg}") + if self.isVisible(): + QMessageBox.critical( + self, + "Device Error", + f"Device error occurred:\n{error_msg}\n\n" + "1. Check USB connection\n" + "2. Try manual reconnect\n" + "3. Restart application if problems persist" + ) + + QTimer.singleShot(0, lambda: ( + self.cleanup_device(), + update_ui() + )) + + except Exception as e: + print(f"Error in error handler: {str(e)}") + try: + self.cleanup_device() + except: + pass def start_test(self): - """Startet den vollständigen Batterietestzyklus""" + """Start the complete battery test cycle.""" if not self.test_running: try: - # Eingabewerte holen und validieren + # Get and validate input values capacity = float(self.capacity_input.text()) charge_cutoff = float(self.charge_cutoff_input.text()) discharge_cutoff = float(self.discharge_cutoff_input.text()) c_rate = float(self.c_rate_input.text()) if capacity <= 0: - raise ValueError("Batteriekapazität muss positiv sein") + raise ValueError("Battery capacity must be positive") if charge_cutoff <= discharge_cutoff: - raise ValueError("Lade-Endspannung muss höher sein als Entlade-Endspannung") + raise ValueError("Charge cutoff must be higher than discharge cutoff") if c_rate <= 0: - raise ValueError("C-Rate muss positiv sein") + raise ValueError("C-rate must be positive") self.continuous_mode = self.continuous_check.isChecked() test_current = c_rate * capacity if test_current > 0.2: - raise ValueError("Strom muss ≤200mA (0,2A) für ADALM1000 sein") + raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") - # Daten zurücksetzen + # Reset data self.time_data.clear() self.voltage_data.clear() self.current_data.clear() @@ -514,267 +758,68 @@ class BatteryTester(QMainWindow): self.time_label.setText("00:00:00") self.reset_plot() - # Protokolldatei vorbereiten + # Prepare log file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}") - # Test starten + # Start test self.test_running = True self.start_time = time.time() - self.test_phase = "Initiale Entladung" + self.test_phase = "Initial Discharge" self.phase_label.setText(self.test_phase) self.start_button.setEnabled(False) self.stop_button.setEnabled(True) - self.status_bar.setText(f"Test gestartet | Entladen auf {discharge_cutoff}V @ {test_current:.3f}A") + self.status_bar.setText(f"Test started | Discharging to {discharge_cutoff}V @ {test_current:.3f}A") - # Testsequenz in eigenem Thread starten - self.test_thread = threading.Thread(target=self.run_test_sequence, daemon=True) + # Start test sequence in separate thread + self.test_thread = TestSequenceThread(self) + self.test_thread.progress_updated.connect(self.update_test_progress) + self.test_thread.cycle_completed.connect(self.update_cycle_stats) + self.test_thread.error_occurred.connect(self.handle_device_error) + self.test_thread.finished.connect(self.finalize_test) self.test_thread.start() except Exception as e: - QMessageBox.critical(self, "Fehler", str(e)) + QMessageBox.critical(self, "Error", str(e)) - def run_test_sequence(self): - """Haupttestsequenz für Lade-/Entladezyklen""" - try: - test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) - charge_cutoff = float(self.charge_cutoff_input.text()) - discharge_cutoff = float(self.discharge_cutoff_input.text()) - - while self.test_running and (self.continuous_mode or self.cycle_count == 0): - self.request_stop = False - self.cycle_count += 1 - self.cycle_label.setText(f"{self.cycle_count}") - - # Neue Protokolldatei für diesen Zyklus - if not self.create_cycle_log_file(): - break - - # Ladephase - self.execute_charge_phase(test_current, charge_cutoff) - if self.request_stop or not self.test_running: - break - - # Ruhephase nach Ladung - self.execute_rest_phase("Nachladung") - if self.request_stop or not self.test_running: - break - - # Entladephase - self.execute_discharge_phase(test_current, discharge_cutoff) - if not self.continuous_check.isChecked(): - self.test_running = False - break - - # Ruhephase nach Entladung - if self.test_running and not self.request_stop: - self.execute_rest_phase("Nach Entladung") - - # Coulomb-Effizienz berechnen - if not self.request_stop and self.charge_capacity > 0: - self.coulomb_efficiency = (self.capacity_ah / self.charge_capacity) * 100 - self.efficiency_label.setText(f"{self.coulomb_efficiency:.1f}") - - # Status aktualisieren - self.status_bar.setText( - f"Zyklus {self.cycle_count} abgeschlossen | " - f"Entladung: {self.capacity_ah:.3f}Ah | " - f"Ladung: {self.charge_capacity:.3f}Ah | " - f"Effizienz: {self.coulomb_efficiency:.1f}%" - ) - - # Zyklusprotokoll schreiben - self.write_cycle_summary() - - # Test abschließen - self.finalize_test() - - except Exception as e: - self.handle_device_error(str(e)) - self.finalize_test() - - def execute_charge_phase(self, current, target_voltage): - """Führt die Ladephase durch""" - if not hasattr(self, 'dev') or not self.session_active: - self.error_signal.emit("Device not connected") - return - self.test_phase = "Laden" - self.phase_label.setText(self.test_phase) - self.status_bar.setText(f"Laden auf {target_voltage}V @ {current:.3f}A") + def update_test_progress(self, progress: float, phase: str): + """Update test progress and phase display.""" + self.test_phase = phase + self.phase_label.setText(phase) + self.progress_bar.setValue(int(progress * 100)) - self.measuring = True - try: - # Reset channel first - self.dev.channels['A'].mode = pysmu.Mode.HI_Z - time.sleep(0.1) - - # Configure for current sourcing - self.dev.channels['A'].mode = pysmu.Mode.SIMV - time.sleep(0.05) - self.dev.channels['A'].constant(current) - time.sleep(0.1) # Allow settling time - actual_current = self.dev.channels['A'].current - if abs(actual_current - current) > 0.01: # 10mA tolerance - self.error_signal.emit(f"Current mismatch: {actual_current:.3f}A vs {current:.3f}A") - - self.charge_capacity = 0.0 - self.charge_capacity_label.setText("0.000") - last_update = time.time() - - while self.test_running and not self.request_stop: - if not self.voltage_data: - time.sleep(0.1) - continue - - now = time.time() - delta_t = now - last_update - last_update = now - - measured_current = abs(self.current_data[-1]) - self.charge_capacity += measured_current * delta_t / 3600 - self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}") - - current_voltage = self.voltage_data[-1] - if current_voltage >= target_voltage or self.request_stop: - break - - time.sleep(0.1) - - except Exception as e: - self.error_signal.emit(f"Charge error: {str(e)}") - - def execute_discharge_phase(self, current, target_voltage): - """Führt die Entladephase durch""" - if not hasattr(self, 'dev') or not self.session_active: - self.error_signal.emit("Device not connected") - return - - self.test_phase = "Entladen" - self.phase_label.setText(self.test_phase) - self.status_bar.setText(f"Entladen auf {target_voltage}V @ {current:.3f}A") + def update_cycle_stats(self, cycle: int, discharge: float, charge: float, efficiency: float): + """Update cycle statistics.""" + self.cycle_count = cycle + self.capacity_ah = discharge + self.charge_capacity = charge + self.coulomb_efficiency = efficiency - self.measuring = True - try: - # Reset channel first - self.dev.channels['A'].mode = pysmu.Mode.HI_Z - time.sleep(0.1) - - # Configure for current sinking - self.dev.channels['A'].mode = pysmu.Mode.SIMV - time.sleep(0.05) - self.dev.channels['A'].constant(-current) - time.sleep(0.1) - - # Current verification - samples = self.dev.read(5) - measured_current = -np.mean([s[0][1] for s in samples]) - if abs(measured_current - current) > 0.02: - self.error_signal.emit(f"Entladestrom Abweichung: {measured_current:.3f}A vs {current:.3f}A") - - self.capacity_ah = 0.0 - self.capacity_label.setText("0.000") - last_update = time.time() - - while self.test_running and not self.request_stop: - if not self.current_data: - time.sleep(0.1) - continue - - now = time.time() - delta_t = now - last_update - last_update = now - - measured_current = abs(self.current_data[-1]) - self.capacity_ah += measured_current * delta_t / 3600 - self.capacity_label.setText(f"{self.capacity_ah:.4f}") - - current_voltage = self.voltage_data[-1] - if current_voltage <= target_voltage or self.request_stop: - break - - time.sleep(0.1) - - except Exception as e: - self.error_signal.emit(f"Discharge error: {str(e)}") - finally: - # Sicherstellen, dass der Kanal zurückgesetzt wird - try: - self.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.dev.channels['A'].constant(0) - except Exception as e: - print(f"Error resetting channel: {e}") - - def execute_rest_phase(self, phase_name): - """Führt eine Ruhephase durch""" - self.test_phase = f"Ruhen ({phase_name})" - self.phase_label.setText(self.test_phase) - self.measuring = False - self.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.dev.channels['A'].constant(0) + self.cycle_label.setText(f"{cycle}") + self.capacity_label.setText(f"{discharge:.4f}") + self.charge_capacity_label.setText(f"{charge:.4f}") + self.efficiency_label.setText(f"{efficiency:.1f}") - rest_time = float(self.rest_time_input.text()) * 3600 - rest_end = time.time() + rest_time + self.status_bar.setText( + f"Cycle {cycle} completed | " + f"Discharge: {discharge:.3f}Ah | " + f"Charge: {charge:.3f}Ah | " + f"Efficiency: {efficiency:.1f}%" + ) - while time.time() < rest_end and self.test_running and not self.request_stop: - time_left = max(0, rest_end - time.time()) - self.status_bar.setText( - f"Ruhen ({phase_name}) | " - f"Verbleibende Zeit: {time_left/60:.1f} min" - ) - time.sleep(1) - - def create_cycle_log_file(self): - """Erstellt eine neue Protokolldatei für den aktuellen Zyklus""" - try: - suffix = 1 - while True: - self.filename = f"{self.base_filename}_{suffix}.csv" - if not os.path.exists(self.filename): - break - suffix += 1 - - self.current_cycle_file = open(self.filename, 'w', newline='') - self.log_writer = csv.writer(self.current_cycle_file) - self.log_writer.writerow([ - "Zeit(s)", "Spannung(V)", "Strom(A)", "Phase", - "Entladekapazität(Ah)", "Ladekapazität(Ah)", - "Coulomb-Eff.(%)", "Zyklus" - ]) - self.log_buffer = [] - return True - except Exception as e: - self.handle_device_error(f"Protokolldatei konnte nicht erstellt werden: {e}") - return False - - def write_cycle_summary(self): - """Schreibt eine Zykluszusammenfassung in die Protokolldatei""" - if hasattr(self, 'current_cycle_file') and self.current_cycle_file: - try: - if self.log_buffer: - self.log_writer.writerows(self.log_buffer) - self.log_buffer.clear() - - summary = ( - f"Zyklus {self.cycle_count} Zusammenfassung - " - f"Entladung={self.capacity_ah:.4f}Ah, " - f"Ladung={self.charge_capacity:.4f}Ah, " - f"Effizienz={self.coulomb_efficiency:.1f}%" - ) - self.current_cycle_file.write(summary + "\n") - self.current_cycle_file.flush() - except Exception as e: - print(f"Fehler beim Schreiben der Zykluszusammenfassung: {e}") + # Write cycle summary + self.write_cycle_summary() def stop_test(self): - """Stoppt den laufenden Test sicher""" + """Safely stop the running test.""" if not self.test_running: return self.request_stop = True self.test_running = False self.measuring = False - self.test_phase = "Bereit" + self.test_phase = "Ready" self.phase_label.setText(self.test_phase) if hasattr(self, 'dev'): @@ -782,70 +827,76 @@ class BatteryTester(QMainWindow): self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) except Exception as e: - print(f"Fehler beim Zurücksetzen des Geräts: {e}") + print(f"Error resetting device: {e}") - # UI aktualisieren - self.status_bar.setText("Test gestoppt - Bereit für neuen Test") + # Update UI + self.status_bar.setText("Test stopped - Ready for new test") self.stop_button.setEnabled(False) self.start_button.setEnabled(True) self.finalize_test(show_message=False) - def finalize_test(self, show_message=True): - """Finale Bereinigung nach Testende mit verbessertem Fenster-Handling""" + def finalize_test(self, show_message: bool = True): + """Final cleanup after test completion.""" try: - # 1. Protokolldaten schreiben (mit zusätzlichem Lock) + # Write log data if hasattr(self, 'log_buffer') and self.log_buffer: try: - with threading.Lock(): # Thread-sicherer Zugriff - self.log_writer.writerows(self.log_buffer) + with open(self.filename, 'a', newline='') as f: + writer = csv.writer(f) + writer.writerows(self.log_buffer) self.log_buffer.clear() except Exception as e: - print(f"Fehler beim Schreiben des Protokollpuffers: {e}") + print(f"Error writing log buffer: {e}") - # 2. Protokolldatei schließen (mit Fehlertoleranz) + # Close log file if hasattr(self, 'current_cycle_file') and self.current_cycle_file: try: - self.current_cycle_file.flush() # Daten sicher schreiben - os.fsync(self.current_cycle_file.fileno()) # Betriebssystem-Puffer leeren + self.current_cycle_file.flush() + os.fsync(self.current_cycle_file.fileno()) self.current_cycle_file.close() except Exception as e: - print(f"Fehler beim Schließen der Protokolldatei: {e}") + print(f"Error closing log file: {e}") - # 3. Benachrichtigung anzeigen (mit Fokus-Sicherung) - if show_message: # Only show if explicitly requested + # Show notification + if show_message: msg_box = QMessageBox(self) msg_box.setWindowFlags(msg_box.windowFlags() | Qt.WindowStaysOnTopHint) msg_box.setIcon(QMessageBox.Information) - msg_box.setWindowTitle("Test abgeschlossen") - msg_box.setText(f"Test beendet\nZyklen: {self.cycle_count}") + msg_box.setWindowTitle("Test Complete") + msg_box.setText(f"Test completed\nCycles: {self.cycle_count}") msg_box.exec_() except Exception as e: - print(f"Kritischer Fehler in finalize_test: {e}") + print(f"Critical error in finalize_test: {e}") finally: - # 4. Gerätestatus zurücksetzen + # Reset test status self.test_running = False self.request_stop = True self.measuring = False - def update_measurements(self, voltage, current, current_time): - """Aktualisiert die Messwerte im UI""" - self.time_data.append(current_time) - self.voltage_data.append(voltage) - self.current_data.append(current) + def update_measurements(self, voltage: float, current: float, current_time: float): + """Update measurements in the UI.""" + if self.time_data.size() > 10000: # Limit data points + self.time_data.dequeue() + self.voltage_data.dequeue() + self.current_data.dequeue() + + self.time_data.enqueue(current_time) + self.voltage_data.enqueue(voltage) + self.current_data.enqueue(current) - # UI aktualisieren + # Update UI self.voltage_label.setText(f"{voltage:.4f}") self.current_label.setText(f"{current:.4f}") self.time_label.setText(self.format_time(current_time)) - # Plot regelmäßig aktualisieren - if len(self.time_data) % 10 == 0: + # Update plot periodically + if self.time_data.size() % 10 == 0: self.update_plot() - # Daten protokollieren, wenn Test läuft + # Log data if test is running if self.test_running and hasattr(self, 'current_cycle_file'): self.log_buffer.append([ f"{current_time:.3f}", @@ -858,7 +909,7 @@ class BatteryTester(QMainWindow): f"{self.cycle_count}" ]) - # Daten in Blöcken schreiben + # Write data in blocks if len(self.log_buffer) >= 10: with open(self.filename, 'a', newline='') as f: writer = csv.writer(f) @@ -866,67 +917,82 @@ class BatteryTester(QMainWindow): self.log_buffer.clear() def update_plot(self): - """Aktualisiert den Plot mit neuen Daten""" - if not self.time_data: + """Update the plot with new data.""" + if self.time_data.empty(): return - self.line_voltage.set_data(self.time_data, self.voltage_data) - self.line_current.set_data(self.time_data, self.current_data) + self.line_voltage.set_data(list(self.time_data), list(self.voltage_data)) + self.line_current.set_data(list(self.time_data), list(self.current_data)) self.auto_scale_axes() self.canvas.draw_idle() def auto_scale_axes(self): - """Passt die Plotachsen automatisch an""" - if not self.time_data: + """Automatically adjust plot axes.""" + if self.time_data.empty(): return - # X-Achse anpassen - max_time = self.time_data[-1] + # X-axis adjustment + max_time = list(self.time_data)[-1] current_xlim = self.ax.get_xlim() if max_time > current_xlim[1] * 0.95: - self.ax.set_xlim(0, max_time * 1.05) - self.ax2.set_xlim(0, max_time * 1.05) + new_xmax = current_xlim[1] + (max_time * 1.05 - current_xlim[1]) * 0.1 + self.ax.set_xlim(0, new_xmax) + self.ax2.set_xlim(0, new_xmax) - # Y-Achsen anpassen - if self.voltage_data: + # Y-axes adjustment + if not self.voltage_data.empty(): voltage_padding = 0.2 - min_v = max(0, min(self.voltage_data) - voltage_padding) - max_v = min(5.0, max(self.voltage_data) + voltage_padding) - self.ax.set_ylim(min_v, max_v) + min_v = max(0, min(list(self.voltage_data)) - voltage_padding) + max_v = min(5.0, max(list(self.voltage_data)) + voltage_padding) + current_ylim = self.ax.get_ylim() + new_min = current_ylim[0] + (min_v - current_ylim[0]) * 0.1 + new_max = current_ylim[1] + (max_v - current_ylim[1]) * 0.1 + self.ax.set_ylim(new_min, new_max) - if self.current_data: + if not self.current_data.empty(): current_padding = 0.05 - min_c = max(-0.25, min(self.current_data) - current_padding) - max_c = min(0.25, max(self.current_data) + current_padding) - self.ax2.set_ylim(min_c, max_c) + min_c = max(-0.25, min(list(self.current_data)) - current_padding) + max_c = min(0.25, max(list(self.current_data)) + current_padding) + current_ylim = self.ax2.get_ylim() + new_min = current_ylim[0] + (min_c - current_ylim[0]) * 0.1 + new_max = current_ylim[1] + (max_c - current_ylim[1]) * 0.1 + self.ax2.set_ylim(new_min, new_max) @staticmethod - def format_time(seconds): - """Formatiert Sekunden als HH:MM:SS""" + def format_time(seconds: float) -> str: + """Format seconds as HH:MM:SS. + + Args: + seconds: Time in seconds + + Returns: + Formatted time string + """ hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def reset_plot(self): - """Setzt den Plot zurück""" + """Reset the plot to initial state.""" self.line_voltage.set_data([], []) self.line_current.set_data([], []) - # Achsen auf Startwerte zurücksetzen - self.ax.set_xlim(0, 10) # X-Achse (Zeit) bei 0 beginnen + # Reset axes to starting values + self.ax.set_xlim(0, 10) voltage_padding = 0.2 min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding - self.ax.set_ylim(min_voltage, max_voltage) + self.ax.set_ylim(min_voltage, max_volume) self.canvas.draw() def closeEvent(self, event): - """Bereinigt beim Schließen des Fensters""" + """Clean up when closing the window.""" self.cleanup_device() event.accept() + if __name__ == "__main__": app = QApplication([]) app.setStyle('Fusion')