diff --git a/MainCode/adalm1000_logger.py b/MainCode/adalm1000_logger.py index cfd7e92..6e448ab 100644 --- a/MainCode/adalm1000_logger.py +++ b/MainCode/adalm1000_logger.py @@ -4,145 +4,210 @@ import time import csv import threading from datetime import datetime -import tkinter as tk -from tkinter import ttk, messagebox -import pysmu import numpy as np -import matplotlib -matplotlib.use('TkAgg') -from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg -from matplotlib.figure import Figure from collections import deque +# Warnungen unterdrücken +os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false' +os.environ['LIBUSB_DEBUG'] = '0' + +from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, + QGridLayout, QLabel, QPushButton, QLineEdit, QFrame, + QCheckBox, QMessageBox, QFileDialog) +from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread +from PyQt5.QtGui import QColor, QPalette + +import pysmu +import matplotlib +matplotlib.use('Qt5Agg') +from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas +from matplotlib.figure import Figure + class DeviceDisconnectedError(Exception): pass -class BatteryTester: - def __init__(self, root): - self._after_ids = set() # Track scheduled callbacks - self._after_lock = threading.Lock() - # Color scheme - self.bg_color = "#2E3440" - self.fg_color = "#D8DEE9" - self.accent_color = "#5E81AC" - self.warning_color = "#BF616A" - self.success_color = "#A3BE8C" +class MeasurementThread(QThread): + update_signal = pyqtSignal(float, float, float) + error_signal = pyqtSignal(str) + + def __init__(self, device, interval=0.1): + super().__init__() + self.device = device + self.interval = interval + self._running = False + self.filter_window_size = 10 + self.start_time = time.time() + + def run(self): + self._running = True + voltage_window = [] + current_window = [] + + while self._running: + try: + samples = self.device.read(self.filter_window_size, 500, True) + if not samples: + raise DeviceDisconnectedError("Keine Samples empfangen") + + raw_voltage = np.mean([s[1][0] for s in samples]) + raw_current = np.mean([s[0][1] for s in samples]) + current_time = time.time() - self.start_time + + # Gleitender Mittelwertfilter + voltage_window.append(raw_voltage) + current_window.append(raw_current) + + if len(voltage_window) > self.filter_window_size: + voltage_window.pop(0) + current_window.pop(0) + + voltage = np.mean(voltage_window) + current = np.mean(current_window) + + self.update_signal.emit(voltage, current, current_time) + time.sleep(max(0.05, self.interval)) + + except Exception as e: + self.error_signal.emit(str(e)) + break + + def stop(self): + self._running = False + if self.isRunning(): + self.quit() + self.wait(500) + +class BatteryTester(QMainWindow): + def __init__(self): + super().__init__() - # Main window configuration - self.root = root - self.root.title("ADALM1000 - Battery Capacity Tester (CC Test)") - self.root.geometry("1000x800") - self.root.minsize(800, 700) - self.root.configure(bg=self.bg_color) - - # Device and measurement state - self.session_active = False - self.measuring = False - self.test_running = False - self.continuous_mode = False - self.request_stop = False - self.interval = 0.1 - self.log_dir = os.path.expanduser("~/adalm1000/logs") - os.makedirs(self.log_dir, exist_ok=True) - - # Battery test parameters - self.capacity = tk.DoubleVar(value=0.2) # Battery capacity in Ah - self.charge_cutoff = tk.DoubleVar(value=1.43) # Charge cutoff voltage - self.discharge_cutoff = tk.DoubleVar(value=0.9) # Discharge cutoff voltage - self.rest_time = tk.DoubleVar(value=0.25) # Rest time in hours - self.c_rate = tk.DoubleVar(value=0.1) # C-rate for test (default C/5 = 0.2) - - # Test progress tracking - self.test_phase = tk.StringVar(value="Idle") - self.capacity_ah = tk.DoubleVar(value=0.0) - self.charge_capacity = tk.DoubleVar(value=0.0) # capacity tracking - self.coulomb_efficiency = tk.DoubleVar(value=0.0) # efficiency calculation - self.cycle_count = tk.IntVar(value=0) # cycle counting - - # Data buffers + # Initialisierung aller Attribute self.time_data = deque() self.voltage_data = deque() self.current_data = deque() self.phase_data = deque() - # Initialize UI and device - self.setup_ui() - self.init_device() + # Testvariablen + self.test_phase = "Bereit" + self.capacity_ah = 0.0 + self.charge_capacity = 0.0 + self.coulomb_efficiency = 0.0 + self.cycle_count = 0 - # Ensure proper cleanup - self.root.protocol("WM_DELETE_WINDOW", self.on_close) + # Farbschema + self.bg_color = QColor(46, 52, 64) + self.fg_color = QColor(216, 222, 233) + self.accent_color = QColor(94, 129, 172) + self.warning_color = QColor(191, 97, 106) + self.success_color = QColor(163, 190, 140) + + # Gerätestatus + self.session_active = False + self.measuring = False + self.test_running = False + self.continuous_mode = False + self.request_stop = False + self.interval = 0.1 + self.log_dir = os.path.expanduser("~/adalm1000/logs") + os.makedirs(self.log_dir, exist_ok=True) + + # Thread-Management + self.measurement_thread = None + self.test_thread = None + + # UI initialisieren + self.setup_ui() + + # Gerät verzögert initialisieren + QTimer.singleShot(100, self.safe_init_device) def setup_ui(self): - """Configure the user interface""" - self.style = ttk.Style() - self.style.theme_use('clam') + """Konfiguriert die Benutzeroberfläche""" + self.setWindowTitle("ADALM1000 - Batteriekapazitätstester (CC-Test)") + self.resize(1000, 800) + self.setMinimumSize(800, 700) - # Configure styles - self.style.configure('.', background=self.bg_color, foreground=self.fg_color) - self.style.configure('TFrame', background=self.bg_color) - self.style.configure('TLabel', background=self.bg_color, foreground=self.fg_color) - self.style.configure('TButton', background=self.accent_color, foreground=self.fg_color, - padding=6, font=('Helvetica', 10, 'bold')) - self.style.map('TButton', - background=[('active', self.accent_color), ('disabled', '#4C566A')], - foreground=[('active', self.fg_color), ('disabled', '#D8DEE9')]) - self.style.configure('TEntry', fieldbackground="#3B4252", foreground=self.fg_color) - self.style.configure('Header.TLabel', font=('Helvetica', 14, 'bold'), foreground=self.accent_color) - self.style.configure('Value.TLabel', font=('Helvetica', 12, 'bold')) - self.style.configure('Status.TLabel', font=('Helvetica', 10)) - self.style.configure('Warning.TButton', background=self.warning_color) - self.style.configure('Success.TButton', background=self.success_color) - - # Main layout - self.content_frame = ttk.Frame(self.root) - self.content_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) - - # Header area - header_frame = ttk.Frame(self.content_frame) - header_frame.pack(fill=tk.X, pady=(0, 20)) + # Hintergrundfarbe setzen + palette = self.palette() + palette.setColor(QPalette.Window, self.bg_color) + palette.setColor(QPalette.WindowText, self.fg_color) + palette.setColor(QPalette.Base, QColor(59, 66, 82)) + palette.setColor(QPalette.Text, self.fg_color) + palette.setColor(QPalette.Button, self.accent_color) + palette.setColor(QPalette.ButtonText, self.fg_color) + self.setPalette(palette) - ttk.Label(header_frame, text="ADALM1000 Battery Capacity Tester (CC Test)", style='Header.TLabel').pack(side=tk.LEFT) + # Hauptwidget und Layout + self.central_widget = QWidget() + self.setCentralWidget(self.central_widget) + self.main_layout = QVBoxLayout(self.central_widget) + self.main_layout.setContentsMargins(10, 10, 10, 10) + self.main_layout.setSpacing(10) - # Status indicator - self.status_light = tk.Canvas(header_frame, width=20, height=20, bg=self.bg_color, bd=0, highlightthickness=0) - self.status_light.pack(side=tk.RIGHT, padx=10) - self.status_indicator = self.status_light.create_oval(2, 2, 18, 18, fill='red') - self.connection_label = ttk.Label(header_frame, text="Disconnected") - self.connection_label.pack(side=tk.RIGHT) + # Kopfbereich + header_frame = QWidget() + header_layout = QHBoxLayout(header_frame) + header_layout.setContentsMargins(0, 0, 0, 0) - # Reconnect button - self.reconnect_btn = ttk.Button(header_frame, text="Reconnect", command=self.reconnect_device) - self.reconnect_btn.pack(side=tk.RIGHT, padx=10) + self.title_label = QLabel("ADALM1000 Batteriekapazitätstester (CC-Test)") + self.title_label.setStyleSheet(f"font-size: 16px; font-weight: bold; color: {self.accent_color.name()};") + header_layout.addWidget(self.title_label, 1) - # Measurement display - display_frame = ttk.LabelFrame(self.content_frame, text=" Live Measurements ", padding=15) - display_frame.pack(fill=tk.BOTH, expand=False) + # Statusindikator + self.connection_label = QLabel("Getrennt") + header_layout.addWidget(self.connection_label) + + self.status_light = QLabel() + self.status_light.setFixedSize(20, 20) + self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") + header_layout.addWidget(self.status_light) + + # Verbinden-Button + self.reconnect_btn = QPushButton("Verbinden") + self.reconnect_btn.clicked.connect(self.reconnect_device) + header_layout.addWidget(self.reconnect_btn) + + self.main_layout.addWidget(header_frame) + + # Messanzeige + display_frame = QFrame() + display_frame.setFrameShape(QFrame.StyledPanel) + display_layout = QGridLayout(display_frame) - # Measurement values measurement_labels = [ - ("Voltage (V)", "V"), - ("Current (A)", "A"), - ("Test Phase", ""), - ("Elapsed Time", "s"), - ("Discharge Capacity", "Ah"), - ("Charge Capacity", "Ah"), - ("Coulomb Eff.", "%"), - ("Cycle Count", ""), + ("Spannung (V)", "V"), + ("Strom (A)", "A"), + ("Testphase", ""), + ("Verstrichene Zeit", "s"), + ("Entladekapazität", "Ah"), + ("Ladekapazität", "Ah"), + ("Coulomb-Eff.", "%"), + ("Zykluszählung", ""), ] for i, (label, unit) in enumerate(measurement_labels): - ttk.Label(display_frame, text=f"{label}:", font=('Helvetica', 11)).grid(row=i//2, column=(i%2)*2, sticky=tk.W, pady=5) - value_label = ttk.Label(display_frame, text="0.000", style='Value.TLabel') - value_label.grid(row=i//2, column=(i%2)*2+1, sticky=tk.W, padx=10) + row = i // 2 + col = (i % 2) * 2 + + lbl = QLabel(f"{label}:") + lbl.setStyleSheet("font-size: 11px;") + display_layout.addWidget(lbl, row, col) + + value_label = QLabel("0.000") + value_label.setStyleSheet("font-size: 12px; font-weight: bold;") + display_layout.addWidget(value_label, row, col + 1) + if unit: - ttk.Label(display_frame, text=unit).grid(row=i//2, column=(i%2)*2+2, sticky=tk.W) - + unit_label = QLabel(unit) + display_layout.addWidget(unit_label, row, col + 2) + if i == 0: self.voltage_label = value_label elif i == 1: self.current_label = value_label elif i == 2: self.phase_label = value_label + self.phase_label.setText(self.test_phase) elif i == 3: self.time_label = value_label elif i == 4: @@ -154,126 +219,151 @@ class BatteryTester: elif i == 7: self.cycle_label = value_label - # Control area - controls_frame = ttk.Frame(self.content_frame) - controls_frame.pack(fill=tk.X, pady=(10, 10), padx=0) - - # Parameters frame - params_frame = ttk.LabelFrame(controls_frame, text="Test Parameters", padding=10) - params_frame.pack(side=tk.LEFT, fill=tk.X, expand=True) - - # Battery capacity - ttk.Label(params_frame, text="Battery Capacity (Ah):").grid(row=0, column=0, sticky=tk.W) - ttk.Entry(params_frame, textvariable=self.capacity, width=6).grid(row=0, column=1, padx=5, sticky=tk.W) - - # Charge cutoff - ttk.Label(params_frame, text="Charge Cutoff (V):").grid(row=1, column=0, sticky=tk.W) - ttk.Entry(params_frame, textvariable=self.charge_cutoff, width=6).grid(row=1, column=1, padx=5, sticky=tk.W) - - # Discharge cutoff - ttk.Label(params_frame, text="Discharge Cutoff (V):").grid(row=2, column=0, sticky=tk.W) - ttk.Entry(params_frame, textvariable=self.discharge_cutoff, width=6).grid(row=2, column=1, padx=5, sticky=tk.W) - - # Rest time - ttk.Label(params_frame, text="Rest Time (hours):").grid(row=3, column=0, sticky=tk.W) - ttk.Entry(params_frame, textvariable=self.rest_time, width=6).grid(row=3, column=1, padx=5, sticky=tk.W) - - # C-rate for test (C/5 by default) - ttk.Label(params_frame, text="Test C-rate:").grid(row=0, column=2, sticky=tk.W, padx=(10,0)) - ttk.Entry(params_frame, textvariable=self.c_rate, width=4).grid(row=0, column=3, padx=5, sticky=tk.W) - ttk.Label(params_frame, text="(e.g., 0.2 for C/5)").grid(row=0, column=4, sticky=tk.W) - - # Start/Stop buttons - button_frame = ttk.Frame(controls_frame) - button_frame.pack(side=tk.RIGHT, padx=10) + self.main_layout.addWidget(display_frame) - self.start_button = ttk.Button(button_frame, text="START TEST", command=self.start_test, style='TButton') - self.start_button.pack(side=tk.TOP, pady=5) + # Steuerbereich + controls_frame = QWidget() + controls_layout = QHBoxLayout(controls_frame) + controls_layout.setContentsMargins(0, 0, 0, 0) - self.stop_button = ttk.Button(button_frame, text="STOP TEST", command=self.stop_test, style='Warning.TButton', state=tk.DISABLED) - self.stop_button.pack(side=tk.TOP, pady=5) + # Parameterrahmen + params_frame = QFrame() + params_frame.setFrameShape(QFrame.StyledPanel) + params_layout = QGridLayout(params_frame) - # Continuous mode checkbox - self.continuous_var = tk.BooleanVar(value=True) - ttk.Checkbutton(button_frame, text="Continuous Mode", variable=self.continuous_var).pack(side=tk.TOP, pady=5) + # Batteriekapazität + params_layout.addWidget(QLabel("Batteriekapazität (Ah):"), 0, 0) + self.capacity_input = QLineEdit("0.2") + self.capacity_input.setFixedWidth(80) + params_layout.addWidget(self.capacity_input, 0, 1) - # Plot area - self.plot_frame = ttk.Frame(self.content_frame) - self.plot_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=(0, 5)) + # Lade-Endspannung + params_layout.addWidget(QLabel("Lade-Endspannung (V):"), 1, 0) + self.charge_cutoff_input = QLineEdit("1.43") + self.charge_cutoff_input.setFixedWidth(80) + params_layout.addWidget(self.charge_cutoff_input, 1, 1) + + # Entlade-Endspannung + params_layout.addWidget(QLabel("Entlade-Endspannung (V):"), 2, 0) + self.discharge_cutoff_input = QLineEdit("0.9") + self.discharge_cutoff_input.setFixedWidth(80) + params_layout.addWidget(self.discharge_cutoff_input, 2, 1) + + # Ruhezeit + params_layout.addWidget(QLabel("Ruhezeit (Stunden):"), 3, 0) + self.rest_time_input = QLineEdit("0.25") + self.rest_time_input.setFixedWidth(80) + params_layout.addWidget(self.rest_time_input, 3, 1) + + # C-Rate für Test + params_layout.addWidget(QLabel("Test-C-Rate:"), 0, 2) + self.c_rate_input = QLineEdit("0.1") + self.c_rate_input.setFixedWidth(60) + params_layout.addWidget(self.c_rate_input, 0, 3) + params_layout.addWidget(QLabel("(z.B. 0.2 für C/5)"), 0, 4) + + controls_layout.addWidget(params_frame, 1) + + # Button-Bereich + button_frame = QWidget() + button_layout = QVBoxLayout(button_frame) + button_layout.setContentsMargins(0, 0, 0, 0) + + self.start_button = QPushButton("TEST STARTEN") + self.start_button.clicked.connect(self.start_test) + self.start_button.setStyleSheet(f"background-color: {self.accent_color.name()}; font-weight: bold;") + self.start_button.setEnabled(False) + button_layout.addWidget(self.start_button) + + self.stop_button = QPushButton("TEST STOPPEN") + self.stop_button.clicked.connect(self.stop_test) + self.stop_button.setStyleSheet(f"background-color: {self.warning_color.name()}; font-weight: bold;") + self.stop_button.setEnabled(False) + button_layout.addWidget(self.stop_button) + + # Kontinuierlicher Modus + self.continuous_check = QCheckBox("Kontinuierlicher Modus") + self.continuous_check.setChecked(True) + button_layout.addWidget(self.continuous_check) + + controls_layout.addWidget(button_frame) + self.main_layout.addWidget(controls_frame) + + # Plotbereich self.setup_plot() - - # Status bar - self.status_var = tk.StringVar() - self.status_var.set("Ready") - self.status_label = ttk.Label(self.root, textvariable=self.status_var, style='Status.TLabel', padding=(0, 5), anchor=tk.W) - self.status_label.place(x=20, relx=0, rely=1.0, anchor='sw', relwidth=0.96, height=28) + self.main_layout.addWidget(self.plot_widget, 1) + + # Statusleiste + self.status_bar = QLabel("Bereit") + self.status_bar.setStyleSheet("font-size: 10px; padding: 5px;") + self.main_layout.addWidget(self.status_bar) def setup_plot(self): - """Configure the matplotlib plot""" + """Konfiguriert den Matplotlib-Plot""" + self.plot_widget = QWidget() + plot_layout = QVBoxLayout(self.plot_widget) + self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440') - # Adjust margins to give more space on right side self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) self.ax = self.fig.add_subplot(111) self.ax.set_facecolor('#3B4252') - # Set initial voltage range based on charge/discharge cutoffs ±0.2V + # Initiale Spannungsbereich voltage_padding = 0.2 - min_voltage = max(0, self.discharge_cutoff.get() - voltage_padding) - max_voltage = self.charge_cutoff.get() + voltage_padding + min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) + max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) - # Voltage plot - self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) - self.ax.set_ylabel("Voltage (V)", color='#00BFFF') + # Spannungsplot + self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Spannung (V)', linewidth=2) + self.ax.set_ylabel("Spannung (V)", color='#00BFFF') self.ax.tick_params(axis='y', labelcolor='#00BFFF') - # Current plot (right axis) - set initial range based on test current + # Stromplot (rechte Achse) self.ax2 = self.ax.twinx() current_padding = 0.05 - test_current = self.c_rate.get() * self.capacity.get() - max_current = test_current * 1.5 # Add 50% padding + test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) + max_current = test_current * 1.5 self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) - self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) - self.ax2.set_ylabel("Current (A)", color='r') + self.line_current, = self.ax2.plot([], [], 'r-', label='Strom (A)', linewidth=2) + self.ax2.set_ylabel("Strom (A)", color='r') self.ax2.tick_params(axis='y', labelcolor='r') - self.ax.set_xlabel('Time (s)', color=self.fg_color) - self.ax.set_title('Battery Test (CC)', color=self.fg_color) - self.ax.tick_params(axis='x', colors=self.fg_color) + self.ax.set_xlabel('Zeit (s)', color=self.fg_color.name()) + self.ax.set_title('Batterietest (CC)', color=self.fg_color.name()) + self.ax.tick_params(axis='x', colors=self.fg_color.name()) self.ax.grid(True, color='#4C566A') - # Position legends to avoid overlap + # Legenden positionieren self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) - # Embed plot - self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame) - self.canvas.draw() - canvas_widget = self.canvas.get_tk_widget() - canvas_widget.configure(bg='#2E3440', bd=0, highlightthickness=0) - canvas_widget.pack(side=tk.TOP, fill=tk.BOTH, expand=True, pady=(10, 0)) + # Plot einbetten + self.canvas = FigureCanvas(self.fig) + plot_layout.addWidget(self.canvas) + + def safe_init_device(self): + """Sichere Geräteinitialisierung mit Fehlerbehandlung""" + try: + self.init_device() + except Exception as e: + self.handle_device_error(str(e)) def init_device(self): - """Initialize the ADALM1000 device with continuous measurement""" + """Initialisiert das ADALM1000-Gerät""" + self.cleanup_device() + try: - # First try to clean up any existing session - if hasattr(self, 'session'): - try: - self.session.end() - del self.session - except: - pass - - # Add small delay to allow device to reset - time.sleep(1) + # Verzögerung zur Vermeidung von "Device busy" Fehlern + time.sleep(1.5) self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) if not self.session.devices: - raise Exception("No ADALM1000 detected - check connections") + raise Exception("Kein ADALM1000 erkannt - Verbindung prüfen") self.dev = self.session.devices[0] - # Reset channels + # Kanäle zurücksetzen self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) @@ -281,775 +371,473 @@ class BatteryTester: self.session.start(0) - self.status_light.itemconfig(self.status_indicator, fill='green') - self.connection_label.config(text="Connected") - self.status_var.set("Device connected | Ready to measure") + self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") + self.connection_label.setText("Verbunden") + self.status_bar.setText("Gerät verbunden | Bereit zur Messung") self.session_active = True - self.start_button.config(state=tk.NORMAL) + self.start_button.setEnabled(True) - # Start continuous measurement thread - self.measurement_event = threading.Event() - self.measurement_event.set() - self.measurement_thread = threading.Thread( - target=self.continuous_measurement, - daemon=True - ) - self.measurement_thread.start() + # Mess-Thread starten + self.start_measurement_thread() except Exception as e: - self.handle_device_error(e) + raise Exception(f"Geräteinitialisierung fehlgeschlagen: {str(e)}") - def continuous_measurement(self): - """Continuous measurement with moving average filtering and optimized I/O""" - filter_window_size = 10 - voltage_window = [] - current_window = [] - last_plot_update = 0 - last_ui_update = 0 - log_buffer = [] - update_interval = 1.0 # Update UI at 1Hz max - - # Initialize start_time for measurements - if not hasattr(self, 'start_time'): - self.start_time = time.time() - - while self.measurement_event.is_set() and self.root.winfo_exists(): + def cleanup_device(self): + """Bereinigt Geräteressourcen""" + # Mess-Thread stoppen + if self.measurement_thread is not None: try: - # Read multiple samples for better accuracy - samples = self.dev.read(filter_window_size, 500, True) - if not samples: - raise DeviceDisconnectedError("No samples received") - - # Get voltage from Channel B (HI_Z mode) and current from Channel A - raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage - raw_current = np.mean([s[0][1] for s in samples]) # Channel A current - current_time = time.time() - self.start_time - - # Apply moving average filter - voltage_window.append(raw_voltage) - current_window.append(raw_current) - - if len(voltage_window) > filter_window_size: - voltage_window.pop(0) - current_window.pop(0) - - voltage = np.mean(voltage_window) - current = np.mean(current_window) - - # Store filtered data - self.time_data.append(current_time) - self.voltage_data.append(voltage) - self.current_data.append(current) - - # Throttle UI updates to prevent lag - now = time.time() - if now - last_ui_update > update_interval: - self.safe_after(0, lambda: self.update_measurement_display( - voltage, current, current_time - )) - last_ui_update = now - - # Throttle plot updates even more (1Hz max) - if now - last_plot_update > 1.0: - self.safe_after(0, self.update_plot) - last_plot_update = now - - # Buffered logging - if self.test_running and hasattr(self, 'current_cycle_file'): - log_buffer.append([ - f"{current_time:.3f}", - f"{voltage:.6f}", - f"{current:.6f}", - self.test_phase.get(), - f"{self.capacity_ah.get():.4f}", - f"{self.charge_capacity.get():.4f}", - f"{self.coulomb_efficiency.get():.1f}", - f"{self.cycle_count.get()}" - ]) - - # Write in chunks of 10 samples - if len(log_buffer) >= 10: - with open(self.filename, 'a', newline='') as f: - writer = csv.writer(f) - writer.writerows(log_buffer) - log_buffer.clear() - - time.sleep(max(0.05, self.interval)) - + self.measurement_thread.stop() + self.measurement_thread = None except Exception as e: - error_msg = str(e) - if self.root.winfo_exists(): - self.safe_after(0, lambda msg=error_msg: - self.handle_device_error(f"Measurement error: {msg}") if self.root.winfo_exists() else None) - break + print(f"Fehler beim Stoppen des Mess-Threads: {e}") + + # Sitzung bereinigen + if hasattr(self, 'session'): + try: + if self.session_active: + self.session.end() + del self.session + except Exception as e: + print(f"Fehler beim Bereinigen der Sitzung: {e}") + finally: + self.session_active = False + + # UI-Status zurücksetzen + self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") + self.connection_label.setText("Getrennt") + self.start_button.setEnabled(False) + self.stop_button.setEnabled(False) + + def start_measurement_thread(self): + """Startet den Mess-Thread""" + if self.measurement_thread is not None: + self.measurement_thread.stop() + + self.measurement_thread = MeasurementThread(self.dev, self.interval) + self.measurement_thread.update_signal.connect(self.update_measurements) + self.measurement_thread.error_signal.connect(self.handle_device_error) + self.measurement_thread.start() + + def reconnect_device(self): + """Versucht, das Gerät erneut zu verbinden""" + self.status_bar.setText("Versuche erneut zu verbinden...") + self.cleanup_device() + QTimer.singleShot(2000, self.safe_init_device) # Mit Verzögerung erneut versuchen + + def handle_device_error(self, error_msg): + """Behandelt Gerätefehler""" + print(f"Gerätefehler: {error_msg}") + self.status_bar.setText(f"Gerätefehler: {error_msg}") + self.cleanup_device() - # Flush remaining buffer on exit - if log_buffer and hasattr(self, 'current_cycle_file'): - with open(self.filename, 'a', newline='') as f: - writer = csv.writer(f) - writer.writerows(log_buffer) + # Nur Meldung anzeigen, wenn Fenster sichtbar ist + if self.isVisible(): + QMessageBox.critical( + self, + "Gerätefehler", + f"Gerätefehler aufgetreten:\n{error_msg}\n\n" + "1. USB-Verbindung prüfen\n" + "2. Manuell erneut verbinden\n" + "3. Anwendung neu starten bei anhaltenden Problemen" + ) def start_test(self): - """Start the full battery test cycle""" + """Startet den vollständigen Batterietestzyklus""" if not self.test_running: try: - # Validate inputs - if self.capacity.get() <= 0: - raise ValueError("Battery capacity must be positive") - if self.charge_cutoff.get() <= self.discharge_cutoff.get(): - raise ValueError("Charge cutoff must be higher than discharge cutoff") - if self.c_rate.get() <= 0: - raise ValueError("C-rate must be positive") + # Eingabewerte holen und validieren + capacity = float(self.capacity_input.text()) + charge_cutoff = float(self.charge_cutoff_input.text()) + discharge_cutoff = float(self.discharge_cutoff_input.text()) + c_rate = float(self.c_rate_input.text()) - # Set continuous mode based on checkbox - self.continuous_mode = self.continuous_var.get() + if capacity <= 0: + raise ValueError("Batteriekapazität muss positiv sein") + if charge_cutoff <= discharge_cutoff: + raise ValueError("Lade-Endspannung muss höher sein als Entlade-Endspannung") + if c_rate <= 0: + raise ValueError("C-Rate muss positiv sein") - # Reset timing for new test - self.measurement_start_time = time.time() - self.test_start_time = time.time() - - # Calculate target current - test_current = self.c_rate.get() * self.capacity.get() + self.continuous_mode = self.continuous_check.isChecked() + test_current = c_rate * capacity if test_current > 0.2: - raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") + raise ValueError("Strom muss ≤200mA (0,2A) für ADALM1000 sein") - # Clear previous data + # Daten zurücksetzen self.time_data.clear() self.voltage_data.clear() self.current_data.clear() - self.phase_data.clear() - self.capacity_ah.set(0.0) - self.charge_capacity.set(0.0) - self.coulomb_efficiency.set(0.0) - self.cycle_count.set(0) - - # Ensure plot is properly reset for new test - self.reset_plot() + self.capacity_ah = 0.0 + self.charge_capacity = 0.0 + self.coulomb_efficiency = 0.0 + self.cycle_count = 0 + self.reset_plot() - # Generate base filename without cycle number + # Protokolldatei vorbereiten timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}") - self.current_cycle_file = None - # Start test thread + # Test starten self.test_running = True self.start_time = time.time() - self.last_update_time = time.time() - self.test_phase.set("Initial Discharge") + self.test_phase = "Initiale Entladung" + self.phase_label.setText(self.test_phase) - self.start_button.config(state=tk.DISABLED) - self.stop_button.config(state=tk.NORMAL) - self.status_var.set(f"Test started | Discharging to {self.discharge_cutoff.get()}V @ {test_current:.3f}A") + self.start_button.setEnabled(False) + self.stop_button.setEnabled(True) + self.status_bar.setText(f"Test gestartet | Entladen auf {discharge_cutoff}V @ {test_current:.3f}A") - # Start test sequence in a new thread + # Testsequenz in eigenem Thread starten self.test_thread = threading.Thread(target=self.run_test_sequence, daemon=True) self.test_thread.start() except Exception as e: - messagebox.showerror("Error", str(e)) - - def create_cycle_log_file(self): - """Create a new log file for the current cycle""" - # Close previous file if exists - if hasattr(self, 'current_cycle_file') and self.current_cycle_file: - try: - self.current_cycle_file.close() - except Exception as e: - print(f"Error closing previous log file: {e}") - - # Check write permissions - if not os.access(self.log_dir, os.W_OK): - messagebox.showerror("Error", f"No write permissions in {self.log_dir}") - return False - - # Create new log file with sequential suffix - suffix = 1 - while True: - self.filename = f"{self.base_filename}_{suffix}.csv" - if not os.path.exists(self.filename): - break - suffix += 1 + QMessageBox.critical(self, "Fehler", str(e)) + def run_test_sequence(self): + """Haupttestsequenz für Lade-/Entladezyklen""" try: + test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) + charge_cutoff = float(self.charge_cutoff_input.text()) + discharge_cutoff = float(self.discharge_cutoff_input.text()) + + while self.test_running and (self.continuous_mode or self.cycle_count == 0): + self.request_stop = False + self.cycle_count += 1 + self.cycle_label.setText(f"{self.cycle_count}") + + # Neue Protokolldatei für diesen Zyklus + if not self.create_cycle_log_file(): + break + + # Ladephase + self.execute_charge_phase(test_current, charge_cutoff) + if self.request_stop or not self.test_running: + break + + # Ruhephase nach Ladung + self.execute_rest_phase("Nachladung") + if self.request_stop or not self.test_running: + break + + # Entladephase + self.execute_discharge_phase(test_current, discharge_cutoff) + if not self.continuous_check.isChecked(): + self.test_running = False + break + + # Ruhephase nach Entladung + if self.test_running and not self.request_stop: + self.execute_rest_phase("Nach Entladung") + + # Coulomb-Effizienz berechnen + if not self.request_stop and self.charge_capacity > 0: + self.coulomb_efficiency = (self.capacity_ah / self.charge_capacity) * 100 + self.efficiency_label.setText(f"{self.coulomb_efficiency:.1f}") + + # Status aktualisieren + self.status_bar.setText( + f"Zyklus {self.cycle_count} abgeschlossen | " + f"Entladung: {self.capacity_ah:.3f}Ah | " + f"Ladung: {self.charge_capacity:.3f}Ah | " + f"Effizienz: {self.coulomb_efficiency:.1f}%" + ) + + # Zyklusprotokoll schreiben + self.write_cycle_summary() + + # Test abschließen + self.finalize_test() + + except Exception as e: + self.handle_device_error(str(e)) + self.finalize_test() + + def execute_charge_phase(self, current, target_voltage): + """Führt die Ladephase durch""" + self.test_phase = "Laden" + self.phase_label.setText(self.test_phase) + self.status_bar.setText(f"Laden auf {target_voltage}V @ {current:.3f}A") + + self.measuring = True + self.dev.channels['A'].mode = pysmu.Mode.SIMV + self.dev.channels['A'].constant(current) + self.charge_capacity = 0.0 + self.charge_capacity_label.setText("0.000") + last_update = time.time() + + while self.test_running and not self.request_stop: + if not self.voltage_data: + time.sleep(0.1) + continue + + now = time.time() + delta_t = now - last_update + last_update = now + + measured_current = abs(self.current_data[-1]) + self.charge_capacity += measured_current * delta_t / 3600 + self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}") + + current_voltage = self.voltage_data[-1] + if current_voltage >= target_voltage or self.request_stop: + break + + time.sleep(0.1) + + def execute_discharge_phase(self, current, target_voltage): + """Führt die Entladephase durch""" + self.test_phase = "Entladen" + self.phase_label.setText(self.test_phase) + self.status_bar.setText(f"Entladen auf {target_voltage}V @ {current:.3f}A") + + self.measuring = True + self.dev.channels['A'].mode = pysmu.Mode.SIMV + self.dev.channels['A'].constant(-current) + self.capacity_ah = 0.0 + self.capacity_label.setText("0.000") + last_update = time.time() + + while self.test_running and not self.request_stop: + if not self.current_data: + time.sleep(0.1) + continue + + now = time.time() + delta_t = now - last_update + last_update = now + + measured_current = abs(self.current_data[-1]) + self.capacity_ah += measured_current * delta_t / 3600 + self.capacity_label.setText(f"{self.capacity_ah:.4f}") + + current_voltage = self.voltage_data[-1] + if current_voltage <= target_voltage or self.request_stop: + break + + time.sleep(0.1) + + def execute_rest_phase(self, phase_name): + """Führt eine Ruhephase durch""" + self.test_phase = f"Ruhen ({phase_name})" + self.phase_label.setText(self.test_phase) + self.measuring = False + self.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.dev.channels['A'].constant(0) + + rest_time = float(self.rest_time_input.text()) * 3600 + rest_end = time.time() + rest_time + + while time.time() < rest_end and self.test_running and not self.request_stop: + time_left = max(0, rest_end - time.time()) + self.status_bar.setText( + f"Ruhen ({phase_name}) | " + f"Verbleibende Zeit: {time_left/60:.1f} min" + ) + time.sleep(1) + + def create_cycle_log_file(self): + """Erstellt eine neue Protokolldatei für den aktuellen Zyklus""" + try: + suffix = 1 + while True: + self.filename = f"{self.base_filename}_{suffix}.csv" + if not os.path.exists(self.filename): + break + suffix += 1 + self.current_cycle_file = open(self.filename, 'w', newline='') self.log_writer = csv.writer(self.current_cycle_file) - self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase", - "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", - "Coulomb_Eff(%)", "Cycle"]) + self.log_writer.writerow([ + "Zeit(s)", "Spannung(V)", "Strom(A)", "Phase", + "Entladekapazität(Ah)", "Ladekapazität(Ah)", + "Coulomb-Eff.(%)", "Zyklus" + ]) self.log_buffer = [] return True except Exception as e: - messagebox.showerror("Error", f"Failed to create log file: {e}") + self.handle_device_error(f"Protokolldatei konnte nicht erstellt werden: {e}") return False - @staticmethod - def format_time(seconds): - """Convert seconds to hh:mm:ss format""" - hours = int(seconds // 3600) - minutes = int((seconds % 3600) // 60) - seconds = int(seconds % 60) - return f"{hours:02d}:{minutes:02d}:{seconds:02d}" + def write_cycle_summary(self): + """Schreibt eine Zykluszusammenfassung in die Protokolldatei""" + if hasattr(self, 'current_cycle_file') and self.current_cycle_file: + try: + if self.log_buffer: + self.log_writer.writerows(self.log_buffer) + self.log_buffer.clear() + + summary = ( + f"Zyklus {self.cycle_count} Zusammenfassung - " + f"Entladung={self.capacity_ah:.4f}Ah, " + f"Ladung={self.charge_capacity:.4f}Ah, " + f"Effizienz={self.coulomb_efficiency:.1f}%" + ) + self.current_cycle_file.write(summary + "\n") + self.current_cycle_file.flush() + except Exception as e: + print(f"Fehler beim Schreiben der Zykluszusammenfassung: {e}") def stop_test(self): - """Request immediate stop of the test and clean up all test data""" + """Stoppt den laufenden Test sicher""" if not self.test_running: return self.request_stop = True self.test_running = False self.measuring = False - self.test_phase.set("Idle") # Ensure phase is set to Idle + self.test_phase = "Bereit" + self.phase_label.setText(self.test_phase) - # Immediately set device to safe state if hasattr(self, 'dev'): try: self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) except Exception as e: - print(f"Error resetting device: {e}") + print(f"Fehler beim Zurücksetzen des Geräts: {e}") - # Clear all data buffers - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - self.phase_data.clear() + # UI aktualisieren + self.status_bar.setText("Test gestoppt - Bereit für neuen Test") + self.stop_button.setEnabled(False) + self.start_button.setEnabled(True) - # Reset test values - self.capacity_ah.set(0.0) - self.charge_capacity.set(0.0) - self.coulomb_efficiency.set(0.0) - - # Reset plot - self.reset_plot() - - # Update UI - self.status_var.set("Test stopped - Ready for new test") - self.stop_button.config(state=tk.DISABLED) - self.start_button.config(state=tk.NORMAL) - - # Finalize test data (logs, etc.) - self.safe_after(100, self.finalize_test) - - def center_window(self, window): - """Center a window on screen""" - window.update_idletasks() - width = window.winfo_width() - height = window.winfo_height() - x = (window.winfo_screenwidth() // 2) - (width // 2) - y = (window.winfo_screenheight() // 2) - (height // 2) - window.geometry(f'{width}x{height}+{x}+{y}') - - def run_test_sequence(self): - try: - test_current = self.c_rate.get() * self.capacity.get() - - while self.test_running and (self.continuous_mode or self.cycle_count.get() == 0): - # Reset stop request at start of each cycle - self.request_stop = False - self.cycle_count.set(self.cycle_count.get() + 1) - - # Create new log file for this cycle - self.create_cycle_log_file() - - # 1. Charge phase (constant current) - self.test_phase.set("Charge") - self.status_var.set(f"Charging to {self.charge_cutoff.get()}V @ {test_current:.3f}A") - self.root.update() # Force UI update - - self.measuring = True - self.dev.channels['B'].mode = pysmu.Mode.HI_Z - self.dev.channels['A'].mode = pysmu.Mode.SIMV - self.dev.channels['A'].constant(test_current) - self.charge_capacity.set(0.0) - target_voltage = self.charge_cutoff.get() - self.last_update_time = time.time() - - while self.test_running and not self.request_stop: - if not self.voltage_data: - time.sleep(0.1) - continue - - current_voltage = self.voltage_data[-1] - measured_current = abs(self.current_data[-1]) - - # Update charge capacity - now = time.time() - delta_t = now - self.last_update_time - self.last_update_time = now - self.charge_capacity.set(self.charge_capacity.get() + measured_current * delta_t / 3600) - - self.status_var.set( - f"Charging: {current_voltage:.3f}V / {target_voltage}V | " - f"Current: {measured_current:.3f}A | " - f"Capacity: {self.charge_capacity.get():.4f}Ah" - ) - self.root.update() # Force UI update - - if current_voltage >= target_voltage or self.request_stop: - break - - time.sleep(0.1) # More frequent checks - - if self.request_stop or not self.test_running: - break - - # 2. Rest period after charge - self.test_phase.set("Resting (Post-Charge)") - self.measuring = False - self.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.dev.channels['A'].constant(0) - - rest_end_time = time.time() + (self.rest_time.get() * 3600) - while time.time() < rest_end_time and self.test_running and not self.request_stop: - time_left = max(0, rest_end_time - time.time()) - self.status_var.set( - f"Resting after charge | " - f"Time left: {time_left/60:.1f} min" - ) - self.root.update() - time.sleep(1) # Check every second for stop request - - if self.request_stop or not self.test_running: - break - - # 3. Discharge phase (capacity measurement) - self.test_phase.set("Discharge") - self.status_var.set(f"Discharging to {self.discharge_cutoff.get()}V @ {test_current:.3f}A") - self.root.update() - - self.measuring = True - self.dev.channels['A'].mode = pysmu.Mode.SIMV - self.dev.channels['A'].constant(-test_current) - self.capacity_ah.set(0.0) - self.last_update_time = time.time() - - while self.test_running and not self.request_stop: - if not self.current_data: - time.sleep(0.1) - continue - - current_voltage = self.voltage_data[-1] - current_current = abs(self.current_data[-1]) - - # Capacity calculation - now = time.time() - delta_t = now - self.last_update_time - self.last_update_time = now - self.capacity_ah.set(self.capacity_ah.get() + current_current * delta_t / 3600) - - if not self.continuous_var.get() and self.continuous_mode: # Only update if it was previously enabled - self.continuous_mode = False # Ensure we don't start a new cycle - self.status_var.set( - f"Continuous Mode disabled | " - f"Discharging to {self.discharge_cutoff.get()}V (will stop after this cycle) | " - f"Current: {current_current:.3f}A | " - f"Capacity: {self.capacity_ah.get():.4f}Ah" - ) - self.root.update() # Force UI update - - else: - # Default status message - self.status_var.set( - f"Discharging: {current_voltage:.3f}V / {self.discharge_cutoff.get()}V | " - f"Current: {current_current:.3f}A | " - f"Capacity: {self.capacity_ah.get():.4f}Ah" - ) - - self.root.update() - - if current_voltage <= self.discharge_cutoff.get() or self.request_stop: - break - - if not self.continuous_var.get(): - self.test_running = False - self.test_phase.set("Idle") # Explicitly set to Idle when stopping - break # Exit the main test loop - - # 4. Rest period after discharge (only if not stopping) - if self.test_running and not self.request_stop: - self.test_phase.set("Resting (Post-Discharge)") - self.measuring = False - self.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.dev.channels['A'].constant(0) - - rest_end_time = time.time() + (self.rest_time.get() * 3600) - while time.time() < rest_end_time and self.test_running and not self.request_stop: - time_left = max(0, rest_end_time - time.time()) - self.status_var.set( - f"Resting after discharge | " - f"Time left: {time_left/60:.1f} min" - ) - self.root.update() - time.sleep(1) - - # Calculate Coulomb efficiency if not stopping - if not self.request_stop and self.charge_capacity.get() > 0: - efficiency = (self.capacity_ah.get() / self.charge_capacity.get()) * 100 - self.coulomb_efficiency.set(efficiency) - - # Update cycle info - self.status_var.set( - f"Cycle {self.cycle_count.get()} complete | " - f"Discharge: {self.capacity_ah.get():.3f}Ah | " - f"Charge: {self.charge_capacity.get():.3f}Ah | " - f"Efficiency: {self.coulomb_efficiency.get():.1f}%" - ) - self.root.update() - - # Write cycle summary to log file - self.write_cycle_summary() - - # Flush remaining buffer data - if hasattr(self, 'log_buffer') and self.log_buffer: - with open(self.filename, 'a', newline='') as f: - writer = csv.writer(f) - writer.writerows(self.log_buffer) - self.log_buffer.clear() - - # Finalize test if stopped or completed - self.safe_after(0, self.finalize_test) - - except Exception as e: - error_msg = str(e) - if self.root.winfo_exists(): - self.safe_after(0, lambda msg=error_msg: messagebox.showerror("Test Error", msg)) - self.safe_after(0, self.finalize_test) + # Testdaten finalisieren + QTimer.singleShot(100, self.finalize_test) def finalize_test(self): - """Final cleanup after test completes or is stopped""" - self.measuring = False - if hasattr(self, 'dev'): - try: - self.dev.channels['A'].constant(0) - except Exception as e: - print(f"Error resetting device: {e}") - - # Flush and close current log file - if hasattr(self, 'log_buffer') and self.log_buffer and hasattr(self, 'log_writer'): + """Finale Bereinigung nach Testende""" + # Protokolldaten schreiben + if hasattr(self, 'log_buffer') and self.log_buffer: try: self.log_writer.writerows(self.log_buffer) self.log_buffer.clear() except Exception as e: - print(f"Error flushing log buffer: {e}") + print(f"Fehler beim Schreiben des Protokollpuffers: {e}") + # Protokolldatei schließen if hasattr(self, 'current_cycle_file'): try: self.current_cycle_file.close() except Exception as e: - print(f"Error closing log file: {e}") + print(f"Fehler beim Schließen der Protokolldatei: {e}") - self.start_button.config(state=tk.NORMAL) - self.stop_button.config(state=tk.DISABLED) - self.request_stop = False - - message = ( - f"Test safely stopped after discharge phase | " - f"Cycle {self.cycle_count.get()} completed | " - f"Final capacity: {self.capacity_ah.get():.3f}Ah" + # Benachrichtigung anzeigen + QMessageBox.information( + self, + "Test abgeschlossen", + f"Test wurde sicher beendet.\n\n" + f"Entladekapazität: {self.capacity_ah:.3f}Ah\n" + f"Abgeschlossene Zyklen: {self.cycle_count}" ) - self.status_var.set(message) - - self.safe_after(0, lambda: messagebox.showinfo( - "Test Completed", - f"Test was safely stopped after discharge phase.\n\n" - f"Final discharge capacity: {self.capacity_ah.get():.3f}Ah\n" - f"Total cycles completed: {self.cycle_count.get()}" - )) - def update_measurement_display(self, voltage, current, current_time): - """Update all measurement displays at once (throttled to 1Hz)""" - try: - # Update all values regardless of change - self.voltage_label.config(text=f"{voltage:.4f}") - self.current_label.config(text=f"{current:.4f}") - self.capacity_label.config(text=f"{self.capacity_ah.get():.4f}") - self.charge_capacity_label.config(text=f"{self.charge_capacity.get():.4f}") - self.efficiency_label.config(text=f"{self.coulomb_efficiency.get():.1f}") - self.cycle_label.config(text=f"{self.cycle_count.get()}") - self.phase_label.config(text=self.test_phase.get()) - self.time_label.config(text=self.format_time(current_time)) + def update_measurements(self, voltage, current, current_time): + """Aktualisiert die Messwerte im UI""" + self.time_data.append(current_time) + self.voltage_data.append(voltage) + self.current_data.append(current) - except Exception as e: - print(f"GUI update error: {e}") + # UI aktualisieren + self.voltage_label.setText(f"{voltage:.4f}") + self.current_label.setText(f"{current:.4f}") + self.time_label.setText(self.format_time(current_time)) + + # Plot regelmäßig aktualisieren + if len(self.time_data) % 10 == 0: + self.update_plot() + + # Daten protokollieren, wenn Test läuft + if self.test_running and hasattr(self, 'current_cycle_file'): + self.log_buffer.append([ + f"{current_time:.3f}", + f"{voltage:.6f}", + f"{current:.6f}", + self.test_phase, + f"{self.capacity_ah:.4f}", + f"{self.charge_capacity:.4f}", + f"{self.coulomb_efficiency:.1f}", + f"{self.cycle_count}" + ]) + + # Daten in Blöcken schreiben + if len(self.log_buffer) >= 10: + with open(self.filename, 'a', newline='') as f: + writer = csv.writer(f) + writer.writerows(self.log_buffer) + self.log_buffer.clear() + + def update_plot(self): + """Aktualisiert den Plot mit neuen Daten""" + if not self.time_data: + return + + self.line_voltage.set_data(self.time_data, self.voltage_data) + self.line_current.set_data(self.time_data, self.current_data) + self.auto_scale_axes() + self.canvas.draw_idle() + + def auto_scale_axes(self): + """Passt die Plotachsen automatisch an""" + if not self.time_data: + return + + # X-Achse anpassen + max_time = self.time_data[-1] + current_xlim = self.ax.get_xlim() + if max_time > current_xlim[1] * 0.95: + self.ax.set_xlim(0, max_time * 1.05) + self.ax2.set_xlim(0, max_time * 1.05) + + # Y-Achsen anpassen + if self.voltage_data: + voltage_padding = 0.2 + min_v = max(0, min(self.voltage_data) - voltage_padding) + max_v = min(5.0, max(self.voltage_data) + voltage_padding) + self.ax.set_ylim(min_v, max_v) + + if self.current_data: + current_padding = 0.05 + min_c = max(-0.25, min(self.current_data) - current_padding) + max_c = min(0.25, max(self.current_data) + current_padding) + self.ax2.set_ylim(min_c, max_c) + + @staticmethod + def format_time(seconds): + """Formatiert Sekunden als HH:MM:SS""" + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + seconds = int(seconds % 60) + return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def reset_plot(self): - """Reset the plot completely for a new test""" - # Clear the data lines + """Setzt den Plot zurück""" self.line_voltage.set_data([], []) self.line_current.set_data([], []) - # Reset the data buffers - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - - # Set reasonable initial axis ranges + # Initiale Achsenbereiche setzen voltage_padding = 0.2 - min_voltage = max(0, self.discharge_cutoff.get() - voltage_padding) - max_voltage = self.charge_cutoff.get() + voltage_padding - self.ax.set_xlim(0, 10) # 10s initial range + min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding) + max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding + self.ax.set_xlim(0, 10) self.ax.set_ylim(min_voltage, max_voltage) current_padding = 0.05 - test_current = self.c_rate.get() * self.capacity.get() - max_current = test_current * 1.5 # 50% padding + test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text()) + max_current = test_current * 1.5 self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) - # Force redraw self.canvas.draw() - - def write_cycle_summary(self): - """Write cycle summary to the current cycle's log file""" - if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file: - return - - summary_line = ( - f"Cycle {self.cycle_count.get()} Summary - " - f"Discharge={self.capacity_ah.get():.4f}Ah, " - f"Charge={self.charge_capacity.get():.4f}Ah, " - f"Efficiency={self.coulomb_efficiency.get():.1f}%" - ) - - # Ensure file is open and write summary - try: - if self.log_buffer: - self.log_writer.writerows(self.log_buffer) - self.log_buffer.clear() - self.current_cycle_file.write(summary_line + "\n") - self.current_cycle_file.flush() - except Exception as e: - print(f"Error writing cycle summary: {e}") - def update_plot(self): - """Optimized plot update with change detection""" - if not self.time_data: - return - - # Force update more frequently at start of test - if len(self.time_data) < 10 or (time.time() - getattr(self, '_last_plot_time', 0)) > 1.0: - self.line_voltage.set_data(self.time_data, self.voltage_data) - self.line_current.set_data(self.time_data, self.current_data) - self.auto_scale_axes() - self.canvas.draw_idle() - self._last_plot_time = time.time() - - def auto_scale_axes(self): - """Auto-scale plot axes with appropriate padding and strict boundaries""" - if not self.time_data: - return - - # X-axis scaling with 5% padding but don't exceed current max time - min_time = 0 - max_time = self.time_data[-1] - current_xlim = self.ax.get_xlim() - - # Only expand axis if new data exceeds current view - if max_time > current_xlim[1] * 0.95: # 95% threshold to start expanding - new_max = max_time * 1.05 # 5% padding - self.ax.set_xlim(min_time, new_max) - self.ax2.set_xlim(min_time, new_max) - - # Voltage axis scaling with strict boundaries - voltage_padding = 0.2 - if self.voltage_data: - min_voltage = max(0, min(self.voltage_data) - voltage_padding) - max_voltage = min(5.0, max(self.voltage_data) + voltage_padding) # 5V hardware limit - current_ylim = self.ax.get_ylim() - if (abs(current_ylim[0] - min_voltage) > 0.1 or - abs(current_ylim[1] - max_voltage) > 0.1): - self.ax.set_ylim(min_voltage, max_voltage) - - # Current axis scaling with strict boundaries - current_padding = 0.05 - if self.current_data: - min_current = max(-0.25, min(self.current_data) - current_padding) # -250mA limit - max_current = min(0.25, max(self.current_data) + current_padding) # +250mA limit - current_ylim2 = self.ax2.get_ylim() - if (abs(current_ylim2[0] - min_current) > 0.02 or - abs(current_ylim2[1] - max_current) > 0.02): - self.ax2.set_ylim(min_current, max_current) - - def handle_device_error(self, error): - """Handle device connection errors""" - if not self.root.winfo_exists(): # Check if window still exists - return - - error_msg = str(error) - print(f"Device error: {error_msg}") - - # Clean up session first - if hasattr(self, 'session'): - try: - if self.session_active: - self.session.end() - del self.session - except Exception as e: - print(f"Error cleaning up session: {e}") - - # Update UI - self.status_light.itemconfig(self.status_indicator, fill='red') - self.connection_label.config(text="Disconnected") - self.status_var.set(f"Device error: {error_msg}") - - self.session_active = False - self.test_running = False - self.continuous_mode = False - self.measuring = False - - if hasattr(self, 'start_button'): - self.start_button.config(state=tk.DISABLED) - if hasattr(self, 'stop_button'): - self.stop_button.config(state=tk.DISABLED) - - # Clear plot + buffers - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'): - self.line_voltage.set_data([], []) - self.line_current.set_data([], []) - self.ax.set_xlim(0, 1) - self.ax2.set_xlim(0, 1) - self.canvas.draw() - - # Show error message and attempt reconnect automatically - if self.root.winfo_exists(): - self.safe_after(100, self.attempt_reconnect) - - def safe_after(self, delay_ms, callback, *args): - """Safely schedule a callback with window existence check""" - if not self.root.winfo_exists(): - return None - - def wrapped_callback(): - if self.root.winfo_exists(): - try: - callback(*args) - except Exception as e: - print(f"Callback error: {e}") - - after_id = self.root.after(delay_ms, wrapped_callback) - self._after_ids.add(after_id) - return after_id - - def attempt_reconnect(self): - """Attempt to reconnect automatically""" - if not self.root.winfo_exists(): - return - - try: - # Show error message first - messagebox.showerror( - "Device Connection Error", - "Could not connect to ADALM1000\n\n" - "1. Check USB cable connection\n" - "2. The device will attempt to reconnect automatically" - ) - except Exception as e: - print(f"Error showing message: {e}") - return - - # Schedule reconnect attempt - self.safe_after(1000, self.reconnect_device) - - def reconnect_device(self): - """Reconnect the device with proper cleanup""" - if not self.root.winfo_exists(): - return - - self.status_var.set("Attempting to reconnect...") - self.root.update() - - # Clear any existing session - if hasattr(self, 'session'): - try: - if self.session_active: - self.session.end() - del self.session - except: - pass - - # Stop any running threads - self.test_running = False - self.continuous_mode = False - self.measuring = False - - if hasattr(self, 'measurement_event'): - self.measurement_event.clear() - - # Wait for threads to finish - if hasattr(self, 'measurement_thread'): - self.measurement_thread.join(timeout=1.0) - if hasattr(self, 'test_thread'): - self.test_thread.join(timeout=1.0) - - # Add small delay to allow device to reset - time.sleep(1.5) - - # Try to initialize device - try: - self.init_device() - if self.session_active: - self.status_var.set("Reconnected successfully") - return - except Exception as e: - print(f"Reconnect failed: {e}") - - # If we get here, reconnection failed - self.status_var.set("Reconnect failed - will retry...") - self.safe_after(2000, self.reconnect_device) # Retry after 2 seconds - - def on_close(self): - """Clean up on window close""" - # Set flags to stop all threads - self.test_running = False - self.measuring = False - self.session_active = False - - if hasattr(self, 'measurement_event'): - self.measurement_event.clear() - - # Cancel all pending callbacks - with self._after_lock: - for after_id in self._after_ids: - try: - self.root.after_cancel(after_id) - except: - pass - self._after_ids.clear() - - # Give threads time to clean up - timeout = 2.0 - if hasattr(self, 'measurement_thread'): - self.measurement_thread.join(timeout=timeout) - if hasattr(self, 'test_thread'): - self.test_thread.join(timeout=timeout) - - # Clean up device session - if hasattr(self, 'session') and self.session: - try: - self.session.end() - except Exception as e: - print(f"Error ending session: {e}") - - # Finally destroy window - try: - self.root.destroy() - except: - pass + def closeEvent(self, event): + """Bereinigt beim Schließen des Fensters""" + self.cleanup_device() + event.accept() if __name__ == "__main__": - root = tk.Tk() - try: - app = BatteryTester(root) - root.mainloop() - except Exception as e: - if root.winfo_exists(): - messagebox.showerror("Fatal Error", f"Application failed: {str(e)}") - else: - print(f"Fatal Error: {e}") - try: - root.destroy() - except: - pass \ No newline at end of file + app = QApplication([]) + app.setStyle('Fusion') + window = BatteryTester() + window.show() + app.exec_() \ No newline at end of file