diff --git a/Controll.py b/Controll.py new file mode 100644 index 0000000..f9763b3 --- /dev/null +++ b/Controll.py @@ -0,0 +1,42 @@ +from pysmu import Session, MODE +from Adalm1000_Logger.DeviceController import DeviceController +from Adalm1000_Logger.GUI import MainWindow, startGUI +import os + + +# --- Setup --- +sess = Session() +sess.add_all() +controllers = {dev.m_serial: DeviceController(dev) for dev in sess.m_devices} +serials = list(controllers.keys()) + +for i in enumerate(serials): + c = controllers[serials[i]] + c.set_mode(0, MODE.HI_Z) # CH A Hi-Z + c.set_mode(1, MODE.HI_Z) # CH B Hi-Z + MainWindow.add_list_item(MainWindow, serials[i], i, c.start, c.stop) + +startGUI() + +# # Beispiel: Gerät 1 starten, Gerät 2 später starten/stoppen + +# c1 = controllers[serials[0]] +# c1.set_mode(0, MODE.HI_Z) # CH A Hi-Z +# c1.set_mode(1, MODE.HI_Z) # CH B Hi-Z +# c1.start() + +# time.sleep(2.0) + +# if len(serials) > 1: +# c2 = controllers[serials[1]] +# c2.set_mode(0, MODE.HI_Z) +# c2.set_mode(1, MODE.HI_Z) +# c2.start() +# time.sleep(3.0) +# c2.stop() # nur Gerät 2 stoppen + +# # sauber beenden (Strg+C-Handling etc. weglassen der Kürze halber) +# for c in controllers.values(): +# c.stop() +# for c in controllers.values(): +# c.shutdown() \ No newline at end of file diff --git a/DeviceController.py b/DeviceController.py new file mode 100644 index 0000000..de4dcc4 --- /dev/null +++ b/DeviceController.py @@ -0,0 +1,89 @@ +import threading, queue, time, csv, os + +CHUNK = 2000 +OUTDIR = "logs" +os.makedirs(OUTDIR, exist_ok=True) + +class DeviceController: + def __init__(self, dev): + self.dev = dev + self.cmdq = queue.Queue() + self.stop_evt = threading.Event() + self.running = False + self.writer_q = queue.Queue(maxsize=50) + self.reader_t = threading.Thread(target=self.reader_loop, daemon=True) + self.writer_t = threading.Thread(target=self.writer_loop, daemon=True) + + def start(self): + if not self.reader_t.is_alive(): self.reader_t.start() + if not self.writer_t.is_alive(): self.writer_t.start() + self.cmdq.put(("start", None)) + + def stop(self): + self.cmdq.put(("stop", None)) + + def set_mode(self, ch, mode): + self.cmdq.put(("mode", (ch, mode))) + + def shutdown(self): + self.stop() + self.stop_evt.set() + self.reader_t.join() + self.writer_t.join() + + def reader_loop(self): + while not self.stop_evt.is_set(): + # Befehle abarbeiten (non-blocking) + try: + cmd, arg = self.cmdq.get_nowait() + if cmd == "start" and not self.running: + # kontinuierlich laufen lassen (0 == unendlich) + self.dev.run(0) # ← per-Gerät-Start + self.running = True + elif cmd == "stop" and self.running: + self.dev.cancel() # ← per-Gerät-Stop (sofort) + # optional sauber aus: self.dev.off() + self.dev.flush(-1, True) # Read-Queue leeren + self.running = False + elif cmd == "mode": + ch, mode = arg # ch: 0 (A) / 1 (B) + self.dev.set_mode(ch, mode) + except queue.Empty: + pass + + if self.running: + # blockierend n Samples holen + try: + data = self.dev.read([], CHUNK, -1, False) # pysmu mappt auf [ [VA,IA,VB,IB], ... ] + # In einfachem CSV-Beispiel loggen wir nur Spannungen A/B: + # data ist Liste von 4-Float-Arrays + vsA = [row[0] for row in data] + vsB = [row[2] for row in data] + self.writer_q.put((time.time(), vsA, vsB)) + except Exception: + # bei Überlauf o.ä. kurz atmen und weiter + time.sleep(0.001) + else: + time.sleep(0.01) + + def writer_loop(self): + ts0 = None + fn = os.path.join(OUTDIR, f"{time.strftime('%Y%m%d_%H%M%S')}_{self.dev.m_serial}.csv") + with open(fn, "w", newline="") as f: + w = csv.writer(f) + w.writerow(["t_rel_s", "ch", "value"]) + sample_idx = 0 + while not (self.stop_evt.is_set() and self.writer_q.empty()): + try: + ts, va, vb = self.writer_q.get(timeout=0.2) + except Exception: + continue + if ts0 is None: ts0 = ts + for i, v in enumerate(va): + t_rel = (sample_idx + i) * 1e-5 # 100 kS/s → 10 µs + w.writerow([t_rel, "A", v]) + for i, v in enumerate(vb): + t_rel = (sample_idx + i) * 1e-5 + w.writerow([t_rel, "B", v]) + sample_idx += len(va) + print(f"[{self.dev.m_serial}] Datei geschlossen.") \ No newline at end of file diff --git a/GUI.py b/GUI.py new file mode 100644 index 0000000..77d745b --- /dev/null +++ b/GUI.py @@ -0,0 +1,52 @@ +import sys +from PyQt5.QtWidgets import QApplication, QWidget, QListWidget, QListWidgetItem, QPushButton, QLabel, QHBoxLayout, QVBoxLayout + +class ListItemWidget(QWidget): + def __init__(self, text, index, btn1f, btn2f): + super().__init__() + self.text = text + self.index = index # damit wir wissen, zu welchem Eintrag der Button gehört + + layout = QHBoxLayout(self) + + # Label für den Text + label = QLabel(text) + layout.addWidget(label) + + # Zwei Buttons rechts + btn1 = QPushButton("Start") + btn2 = QPushButton("Stop") + + # Funktionen verbinden + btn1.clicked.connect(btn1f) + btn2.clicked.connect(btn2f) + + layout.addWidget(btn1) + layout.addWidget(btn2) + layout.setContentsMargins(5, 2, 5, 2) + layout.addStretch() + +class MainWindow(QWidget): + def __init__(self): + super().__init__() + self.setWindowTitle("Liste mit 2 Köpfen") + self.resize(300, 200) + + main_layout = QVBoxLayout(self) + + # QListWidget + self.list_widget = QListWidget() + main_layout.addWidget(self.list_widget) + + def add_list_item(self, text, index ,btn1f, btn2f): + item = QListWidgetItem(self.list_widget) + widget = ListItemWidget(text) + item.setSizeHint(widget.sizeHint()) + self.list_widget.addItem(item) + self.list_widget.setItemWidget(item, widget) + +def startGUI(): + app = QApplication(sys.argv) + window = MainWindow() + window.show() + sys.exit(app.exec_()) \ No newline at end of file diff --git a/adalm1000_logger.py b/adalm1000_logger.py new file mode 100644 index 0000000..86b26e0 --- /dev/null +++ b/adalm1000_logger.py @@ -0,0 +1,2400 @@ +# -*- coding: utf-8 -*- +import os +import time +import csv +import threading +from datetime import datetime +import numpy as np +import matplotlib +matplotlib.use('Qt5Agg') +from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas +from matplotlib.figure import Figure +from collections import deque +from queue import Queue, Full, Empty + +from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, + QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog, QComboBox) +from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread +from PyQt5 import sip +import pysmu + +class DeviceDisconnectedError(Exception): + pass + +class MeasurementThread(QThread): + update_signal = pyqtSignal(float, float, float) + error_signal = pyqtSignal(str) + + def __init__(self, device, interval=0.1): + super().__init__() + self.device = device + self.interval = interval + self._running = False + self.filter_window_size = 10 + self.voltage_window = [] + self.current_window = [] + self.start_time = None + self.measurement_queue = Queue(maxsize=1) + self.current_direction = 1 # 1 for source, -1 for sink + + def run(self): + """Continuous measurement loop""" + self._running = True + if self.start_time is None: # Nur setzen wenn noch nicht gesetzt + self.start_time = time.time() + + while self._running: + try: + samples = self.device.read(self.filter_window_size, 500, True) + if not samples: + raise DeviceDisconnectedError("No samples received") + + current_time = time.time() - self.start_time + + # Get voltage from Channel B (HI_Z mode) and current from Channel A + raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage + raw_current = np.mean([s[0][1] for s in samples]) * self.current_direction # Channel A current with direction + + # Update filter windows + self.voltage_window.append(raw_voltage) + self.current_window.append(raw_current) + + if len(self.voltage_window) > self.filter_window_size: + self.voltage_window.pop(0) + self.current_window.pop(0) + + voltage = np.mean(self.voltage_window) + current = np.mean(self.current_window) + + # Validate measurements + if voltage is None or not (-1.0 <= voltage <= 6.0): + raise ValueError(f"Invalid voltage: {voltage}V") + if not (-0.25 <= current <= 0.25): + raise ValueError(f"Invalid current: {current}A") + + # Emit update + self.update_signal.emit(voltage, current, current_time) + + # Store measurement + try: + self.measurement_queue.put_nowait((voltage, current)) + except Full: + pass + + time.sleep(max(0.05, self.interval)) + + except Exception as e: + self.error_signal.emit(f"Read error: {str(e)}") + time.sleep(1) + continue + + def set_direction(self, direction): + """Set current direction (1 for source, -1 for sink)""" + self.current_direction = direction + + def stop(self): + self._running = False + self.wait(500) + +class TestSequenceWorker(QObject): + finished = pyqtSignal() + update_phase = pyqtSignal(str) + update_status = pyqtSignal(str) + test_completed = pyqtSignal() + error_occurred = pyqtSignal(str) + + def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent): + super().__init__() + self.device = device + self.test_current = test_current + self.charge_cutoff = charge_cutoff + self.discharge_cutoff = discharge_cutoff + self.rest_time = rest_time * 3600 # Convert hours to seconds + self.continuous_mode = continuous_mode + self.parent = parent + self._running = True + self.voltage_timeout = 0.5 # seconds + + def get_latest_measurement(self): + """Thread-safe measurement reading with timeout""" + try: + return self.parent.measurement_thread.measurement_queue.get( + timeout=self.voltage_timeout + ) + except Empty: + return (None, None) # Return tuple for unpacking + + def charge_phase(self): + """Handle the battery charging phase""" + self.update_phase.emit("Charge") + self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.4f}A") + + try: + # Configure channels - Channel A sources current, Channel B measures voltage + self.device.channels['B'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].mode = pysmu.Mode.SIMV + self.device.channels['A'].constant(self.test_current) + self.parent.measurement_thread.set_direction(1) # Source current + + # Small delay to allow current to stabilize + time.sleep(0.1) + + while self._running: + voltage, current = self.get_latest_measurement() + if voltage is None: + continue + + # Update parent's data for logging/display + with self.parent.plot_mutex: + if len(self.parent.voltage_data) > 0: + self.parent.voltage_data[-1] = voltage + self.parent.current_data[-1] = current + + if voltage >= self.charge_cutoff: + break + + time.sleep(0.1) + + finally: + self.device.channels['A'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].constant(0) + + def discharge_phase(self): + """Handle the battery discharging phase""" + voltage, _ = self.get_latest_measurement() + if voltage is not None and voltage <= self.discharge_cutoff: + self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)") + return + self.update_phase.emit("Discharge") + self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A") + + try: + # Configure channels - Channel A sinks current, Channel B measures voltage + self.device.channels['B'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].mode = pysmu.Mode.SIMV + self.device.channels['A'].constant(-self.test_current) + self.parent.measurement_thread.set_direction(-1) # Sink current + + # Small delay to allow current to stabilize + time.sleep(0.1) + + while self._running: + voltage, current = self.get_latest_measurement() + if voltage is None: + continue + + # Update parent's data for logging/display + with self.parent.plot_mutex: + if len(self.parent.voltage_data) > 0: + self.parent.voltage_data[-1] = voltage + self.parent.current_data[-1] = current + + if voltage <= self.discharge_cutoff: + break + + time.sleep(0.1) + + finally: + self.device.channels['A'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].constant(0) + + def rest_phase(self, phase_name): + """Handle rest period between phases""" + self.update_phase.emit(f"Resting ({phase_name})") + rest_end = time.time() + self.rest_time + + while time.time() < rest_end and self._running: + time_left = max(0, rest_end - time.time()) + self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min") + time.sleep(1) + + def stop(self): + """Request the thread to stop""" + self._running = False + try: + self.device.channels['A'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].constant(0) + self.device.channels['B'].mode = pysmu.Mode.HI_Z + except Exception as e: + print(f"Error stopping device: {e}") + + def run(self): + """Main test sequence loop""" + try: + first_cycle = True # Ensure at least one cycle runs + + while (self._running and + (self.parent.continuous_mode_check.isChecked() or first_cycle)): + self.parent.request_stop = False + self.parent.cycle_count += 1 + first_cycle = False # Only True for the first cycle + + # 1. Charge phase (constant current) + self.charge_phase() + if not self._running or self.parent.request_stop: + break + + # 2. Rest period after charge + self.rest_phase("Post-Charge") + if not self._running or self.parent.request_stop: + break + + # 3. Discharge phase (capacity measurement) + self.discharge_phase() + if not self._running or self.parent.request_stop: + break + + # 4. Rest period after discharge (only if not stopping) + if self._running and not self.parent.request_stop: + self.rest_phase("Post-Discharge") + + # Calculate Coulomb efficiency if not stopping + if not self.parent.request_stop and self.parent.charge_capacity > 0: + self.parent.coulomb_efficiency = ( + self.parent.capacity_ah / self.parent.charge_capacity + ) * 100 + + # Test completed + self.test_completed.emit() + + except Exception as e: + self.error_occurred.emit(f"Test sequence error: {str(e)}") + finally: + self.finished.emit() + +class DischargeWorker(QObject): + finished = pyqtSignal() + update_status = pyqtSignal(str) + test_completed = pyqtSignal() + error_occurred = pyqtSignal(str) + + def __init__(self, device, test_current, discharge_cutoff, parent): + super().__init__() + self.device = device + self.test_current = test_current + self.discharge_cutoff = discharge_cutoff + self.parent = parent + self._running = True + self.voltage_timeout = 0.5 # seconds + + def get_latest_measurement(self): + """Thread-safe measurement reading with timeout""" + try: + return self.parent.measurement_thread.measurement_queue.get( + timeout=self.voltage_timeout + ) + except Empty: + return (None, None) # Return tuple for unpacking + + def discharge_phase(self): + """Handle the battery discharging phase""" + voltage, _ = self.get_latest_measurement() + if voltage is not None and voltage <= self.discharge_cutoff: + self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)") + return + self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A") + + try: + # Configure channels - Channel A sinks current, Channel B measures voltage + self.device.channels['B'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].mode = pysmu.Mode.SIMV + self.device.channels['A'].constant(-self.test_current) + self.parent.measurement_thread.set_direction(-1) # Sink current + + # Small delay to allow current to stabilize + time.sleep(0.1) + + while self._running: + voltage, current = self.get_latest_measurement() + if voltage is None: + continue + + # Update parent's data for logging/display + with self.parent.plot_mutex: + if len(self.parent.voltage_data) > 0: + self.parent.voltage_data[-1] = voltage + self.parent.current_data[-1] = current + + if voltage <= self.discharge_cutoff: + break + + time.sleep(0.1) + + finally: + self.device.channels['A'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].constant(0) + + def stop(self): + """Request the thread to stop""" + self._running = False + try: + self.device.channels['A'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].constant(0) + self.device.channels['B'].mode = pysmu.Mode.HI_Z + except Exception as e: + print(f"Error stopping device: {e}") + + def run(self): + """Main discharge sequence""" + try: + self.parent.request_stop = False + self.parent.cycle_count = 1 # Only one discharge cycle + + # Discharge phase + self.discharge_phase() + + if not self._running or self.parent.request_stop: + return + + # Test completed + self.test_completed.emit() + + except Exception as e: + self.error_occurred.emit(f"Discharge error: {str(e)}") + finally: + self.finished.emit() + +class ChargeWorker(QObject): + finished = pyqtSignal() + update_status = pyqtSignal(str) + test_completed = pyqtSignal() + error_occurred = pyqtSignal(str) + + def __init__(self, device, test_current, charge_cutoff, parent): + super().__init__() + self.device = device + self.test_current = test_current + self.charge_cutoff = charge_cutoff + self.parent = parent + self._running = True + + def run(self): + """Main charge sequence""" + try: + self.parent.measurement_thread.set_direction(1) # Source current + + # Configure channels - Channel A sources current, Channel B measures voltage + self.device.channels['B'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].mode = pysmu.Mode.SIMV + self.device.channels['A'].constant(self.test_current) + time.sleep(0.1) # Allow current to stabilize + + while self._running: + voltage, current = self.parent.get_latest_measurement() + if voltage is None: + continue + + # Update parent's data for logging/display + with self.parent.plot_mutex: + if len(self.parent.voltage_data) > 0: + self.parent.voltage_data[-1] = voltage + self.parent.current_data[-1] = current + + if voltage >= self.charge_cutoff: + break + + time.sleep(0.1) + + self.test_completed.emit() + except Exception as e: + self.error_occurred.emit(f"Charge error: {str(e)}") + finally: + self.device.channels['A'].constant(0) + self.finished.emit() + + def stop(self): + """Request the thread to stop""" + self._running = False + try: + self.device.channels['A'].constant(0) + except Exception as e: + print(f"Error stopping charge: {e}") + +class BatteryTester(QMainWindow): + def __init__(self): + self.plot_mutex = threading.Lock() + super().__init__() + + self.last_logged_phase = None + + # Color scheme + self.bg_color = "#2E3440" + self.fg_color = "#D8DEE9" + self.accent_color = "#5E81AC" + self.warning_color = "#BF616A" + self.success_color = "#A3BE8C" + + # Device and measurement state + self.session_active = False + self.measuring = False + self.test_running = False + self.continuous_mode = False + self.request_stop = False + self.interval = 0.1 + self.log_dir = os.path.expanduser("~/adalm1000/logs") + os.makedirs(self.log_dir, exist_ok=True) + + # Data buffers + max_data_points = 36000 # Define this first + self.time_data = deque(maxlen=max_data_points) + self.voltage_data = deque(maxlen=max_data_points) + self.current_data = deque(maxlen=max_data_points) + self.max_points_to_keep = 10000 + self.display_time_data = deque(maxlen=self.max_points_to_keep) + self.display_voltage_data = deque(maxlen=self.max_points_to_keep) + self.display_current_data = deque(maxlen=self.max_points_to_keep) + self.aggregation_buffer = { + 'time': [], 'voltage': [], 'current': [], + 'count': 0, 'last_plot_time': 0 + } + self.phase_data = deque() + self.downsample_factor = 1 # Initial kein Downsampling + self.downsample_counter = 0 + + # Initialize all measurement variables + self.capacity_ah = 0.0 + self.energy = 0.0 + self.charge_capacity = 0.0 + self.coulomb_efficiency = 0.0 + self.cycle_count = 0 + self.start_time = time.time() + self.last_update_time = self.start_time + + # Initialize UI and device + self.setup_ui() + self.init_device() + + # Set window properties + self.setWindowTitle("ADALM1000 - Battery Tester (Multi-Mode)") + self.resize(1000, 800) + self.setMinimumSize(800, 700) + + # Status update timer + self.status_timer = QTimer() + self.status_timer.timeout.connect(self.update_status_and_plot) + self.status_timer.start(1000) #every second + + def get_latest_measurement(self, timeout=0.5): + """Return latest measurement (voltage, current, timestamp_or_none) for the main UI. + Uses the measurement_thread's measurement_queue if available, else falls back to + self.latest_measurement or measurement containers. Returns (None, None) on timeout. + """ + try: + # Preferred: measurement_thread with queue + if hasattr(self, 'measurement_thread') and getattr(self, 'measurement_thread') is not None: + mq = getattr(self.measurement_thread, 'measurement_queue', None) + if mq is not None: + try: + return mq.get(timeout=timeout) + except Empty: + return (None, None) + # Fallback: attribute latest_measurement + if hasattr(self, 'latest_measurement') and getattr(self, 'latest_measurement') is not None: + return getattr(self, 'latest_measurement') + # Fallback: list-like measurements + for attr in ('measurements', 'measurement_buffer', 'voltage_data', 'current_data'): + if hasattr(self, attr): + container = getattr(self, attr) + try: + if hasattr(container, '__len__') and len(container) > 0: + # If separate voltage/current arrays, return last pair if possible + if attr == 'voltage_data' and hasattr(self, 'current_data') and len(self.current_data) > 0: + return (self.voltage_data[-1], self.current_data[-1]) + return container[-1] + except Exception: + pass + except Exception: + # Never raise from the getter - return a safe None tuple + pass + return (None, None) + def setup_ui(self): + """Configure the user interface""" + # Main widget and layout + self.central_widget = QWidget() + self.setCentralWidget(self.central_widget) + self.main_layout = QVBoxLayout(self.central_widget) + self.main_layout.setContentsMargins(10, 10, 10, 10) + + # Mode selection + mode_frame = QFrame() + mode_frame.setFrameShape(QFrame.StyledPanel) + mode_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") + mode_layout = QHBoxLayout(mode_frame) + + self.mode_label = QLabel("Test Mode:") + self.mode_label.setStyleSheet(f"color: {self.fg_color};") + mode_layout.addWidget(self.mode_label) + + self.mode_combo = QComboBox() + self.mode_combo.addItems(["Live Monitoring", "Discharge Test", "Charge Test", "Cycle Test"]) # Added Charge Test + self.mode_combo.setStyleSheet(f""" + QComboBox {{ + background-color: #3B4252; + color: {self.fg_color}; + border: 1px solid #4C566A; + border-radius: 3px; + padding: 2px; + }} + """) + self.mode_combo.currentTextChanged.connect(self.change_mode) + mode_layout.addWidget(self.mode_combo, 1) + + self.main_layout.addWidget(mode_frame) + + # Header area + header_frame = QFrame() + header_frame.setFrameShape(QFrame.NoFrame) + header_layout = QHBoxLayout(header_frame) + header_layout.setContentsMargins(0, 0, 0, 0) + + self.title_label = QLabel("ADALM1000 Battery Tester") + self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};") + header_layout.addWidget(self.title_label, 1) + + # Status indicator + self.status_light = QLabel() + self.status_light.setFixedSize(20, 20) + self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") + header_layout.addWidget(self.status_light) + + self.connection_label = QLabel("Disconnected") + header_layout.addWidget(self.connection_label) + + # Reconnect button + self.reconnect_btn = QPushButton("Reconnect") + self.reconnect_btn.clicked.connect(self.reconnect_device) + header_layout.addWidget(self.reconnect_btn) + + self.main_layout.addWidget(header_frame) + + # Measurement display + display_frame = QFrame() + display_frame.setFrameShape(QFrame.StyledPanel) + display_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") + display_layout = QGridLayout(display_frame) + + # Measurement values - common for all modes + measurement_labels = [ + ("Voltage", "V"), ("Current", "A"), ("Test Phase", ""), + ("Elapsed Time", "s"), ("Capacity", "Ah"), ("Power", "W"), + ("Energy", "Wh"), ("Cycle Count", ""), ("Battery Temp", "°C") + ] + + for i, (label, unit) in enumerate(measurement_labels): + row = i // 3 + col = (i % 3) * 3 + + lbl = QLabel(f"{label}:") + lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;") + display_layout.addWidget(lbl, row, col) + + value_lbl = QLabel("0.000") + value_lbl.setStyleSheet(f""" + color: {self.fg_color}; + font-weight: bold; + font-size: 12px; + min-width: 60px; + """) + display_layout.addWidget(value_lbl, row, col + 1) + + if unit: + unit_lbl = QLabel(unit) + unit_lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;") + display_layout.addWidget(unit_lbl, row, col + 2) + + for i in range(9): + display_layout.setColumnStretch(i, 1 if i % 3 == 1 else 0) + + self.voltage_label = display_layout.itemAtPosition(0, 1).widget() + self.current_label = display_layout.itemAtPosition(0, 4).widget() + self.phase_label = display_layout.itemAtPosition(0, 7).widget() + self.time_label = display_layout.itemAtPosition(1, 1).widget() + self.capacity_label = display_layout.itemAtPosition(1, 4).widget() + self.power_label = display_layout.itemAtPosition(1, 7).widget() + self.energy_label = display_layout.itemAtPosition(2, 1).widget() + self.cycle_label = display_layout.itemAtPosition(2, 4).widget() + self.temp_label = display_layout.itemAtPosition(2, 7).widget() + + self.main_layout.addWidget(display_frame) + + # Control area + controls_frame = QFrame() + controls_frame.setFrameShape(QFrame.NoFrame) + controls_layout = QHBoxLayout(controls_frame) + controls_layout.setContentsMargins(0, 0, 0, 0) + + # Parameters frame + self.params_frame = QFrame() + self.params_frame.setFrameShape(QFrame.StyledPanel) + self.params_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}") + self.params_layout = QGridLayout(self.params_frame) + + # Common parameters + self.capacity = 0.2 + self.capacity_label_input = QLabel("Battery Capacity (Ah):") + self.capacity_label_input.setStyleSheet(f"color: {self.fg_color};") + self.params_layout.addWidget(self.capacity_label_input, 0, 0) + self.capacity_input = QLineEdit("0.2") + self.capacity_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") + self.capacity_input.setFixedWidth(60) + self.params_layout.addWidget(self.capacity_input, 0, 1) + + # C-rate for test + self.c_rate = 0.1 + self.c_rate_label = QLabel("Test C-rate:") + self.c_rate_label.setStyleSheet(f"color: {self.fg_color};") + self.params_layout.addWidget(self.c_rate_label, 1, 0) + self.c_rate_input = QLineEdit("0.1") + self.c_rate_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") + self.c_rate_input.setFixedWidth(40) + self.params_layout.addWidget(self.c_rate_input, 1, 1) + + c_rate_note = QLabel("(e.g., 0.2 for C/5)") + c_rate_note.setStyleSheet(f"color: {self.fg_color};") + self.params_layout.addWidget(c_rate_note, 1, 2) + + # Discharge cutoff (used in Discharge and Cycle modes) + self.discharge_cutoff = 0.9 + self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):") + self.discharge_cutoff_label.setStyleSheet(f"color: {self.fg_color};") + self.params_layout.addWidget(self.discharge_cutoff_label, 2, 0) + self.discharge_cutoff_input = QLineEdit("0.9") + self.discharge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") + self.discharge_cutoff_input.setFixedWidth(60) + self.params_layout.addWidget(self.discharge_cutoff_input, 2, 1) + + # Charge cutoff (only for Cycle mode) + self.charge_cutoff = 1.43 + self.charge_cutoff_label = QLabel("Charge Cutoff (V):") + self.charge_cutoff_label.setStyleSheet(f"color: {self.fg_color};") + self.params_layout.addWidget(self.charge_cutoff_label, 3, 0) + self.charge_cutoff_input = QLineEdit("1.43") + self.charge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") + self.charge_cutoff_input.setFixedWidth(60) + self.params_layout.addWidget(self.charge_cutoff_input, 3, 1) + self.charge_cutoff_label.hide() + self.charge_cutoff_input.hide() + + # Rest time (only for Cycle mode) + self.rest_time = 0.25 + self.rest_time_label = QLabel("Rest Time (hours):") + self.rest_time_label.setStyleSheet(f"color: {self.fg_color};") + self.params_layout.addWidget(self.rest_time_label, 4, 0) + self.rest_time_input = QLineEdit("0.25") + self.rest_time_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") + self.rest_time_input.setFixedWidth(60) + self.params_layout.addWidget(self.rest_time_input, 4, 1) + self.rest_time_label.hide() + self.rest_time_input.hide() + + # Test conditions input + self.test_conditions_label = QLabel("Test Conditions/Chemistry:") + self.test_conditions_label.setStyleSheet(f"color: {self.fg_color};") + self.params_layout.addWidget(self.test_conditions_label, 5, 0) + self.test_conditions_input = QLineEdit("") + self.test_conditions_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};") + self.test_conditions_input.setFixedWidth(120) + self.params_layout.addWidget(self.test_conditions_input, 5, 1) + + controls_layout.addWidget(self.params_frame, 1) + + # Button frame + button_frame = QFrame() + button_frame.setFrameShape(QFrame.NoFrame) + button_layout = QVBoxLayout(button_frame) + button_layout.setContentsMargins(0, 0, 0, 0) + + # Start/Stop buttons + self.start_button = QPushButton("START") + self.start_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.accent_color}; + color: {self.fg_color}; + font-weight: bold; + padding: 6px; + border-radius: 4px; + }} + QPushButton:disabled {{ + background-color: #4C566A; + color: #D8DEE9; + }} + """) + self.start_button.clicked.connect(self.start_test) + button_layout.addWidget(self.start_button) + + self.stop_button = QPushButton("STOP") + self.stop_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.warning_color}; + color: {self.fg_color}; + font-weight: bold; + padding: 6px; + border-radius: 4px; + }} + QPushButton:disabled {{ + background-color: #4C566A; + color: #D8DEE9; + }} + """) + self.stop_button.clicked.connect(self.stop_test) + self.stop_button.setEnabled(False) + button_layout.addWidget(self.stop_button) + + # Continuous mode checkbox (only for Cycle mode) + self.continuous_mode_check = QCheckBox("Continuous Mode") + self.continuous_mode_check.setChecked(True) + self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};") + button_layout.addWidget(self.continuous_mode_check) + self.continuous_mode_check.stateChanged.connect(self.handle_continuous_mode_change) + self.continuous_mode_check.hide() + + # Record button for Live mode + self.record_button = QPushButton("Start Recording") + self.record_button.setCheckable(True) + self.record_button.setStyleSheet(f""" + QPushButton {{ + background-color: {self.success_color}; + color: {self.fg_color}; + font-weight: bold; + padding: 6px; + border-radius: 4px; + }} + QPushButton:checked {{ + background-color: {self.warning_color}; + }} + """) + self.record_button.clicked.connect(self.toggle_recording) + button_layout.addWidget(self.record_button) + self.record_button.hide() + + controls_layout.addWidget(button_frame) + self.main_layout.addWidget(controls_frame) + + # Plot area + self.setup_plot() + + # Status bar + self.status_bar = self.statusBar() + self.status_bar.setStyleSheet(f"color: {self.fg_color};") + self.status_bar.showMessage("Ready") + + # Apply dark theme + self.setStyleSheet(f""" + QMainWindow {{ + background-color: {self.bg_color}; + }} + QLabel {{ + color: {self.fg_color}; + }} + QLineEdit {{ + background-color: #3B4252; + color: {self.fg_color}; + border: 1px solid #4C566A; + border-radius: 3px; + padding: 2px; + }} + """) + + # Set initial mode + self.current_mode = "Live Monitoring" + self.mode_combo.setCurrentText(self.current_mode) + self.change_mode(self.current_mode) # Initialize UI for live mode + + def change_mode(self, mode_name): + """Change between different test modes""" + self.current_mode = mode_name + self.stop_test() # Stop any current operation + + # Hide all optional parameters first + self.charge_cutoff_label.hide() + self.charge_cutoff_input.hide() + self.discharge_cutoff_label.hide() + self.discharge_cutoff_input.hide() + self.rest_time_label.hide() + self.rest_time_input.hide() + self.continuous_mode_check.hide() + self.record_button.hide() + + # Show mode-specific parameters + if mode_name == "Cycle Test": + self.charge_cutoff_label.show() + self.charge_cutoff_input.show() + self.discharge_cutoff_label.show() + self.discharge_cutoff_input.show() + self.rest_time_label.show() + self.rest_time_input.show() + self.continuous_mode_check.show() + self.start_button.setText("START CYCLE TEST") + self.start_button.setEnabled(True) # Explicitly enable + elif mode_name == "Discharge Test": + self.discharge_cutoff_label.show() + self.discharge_cutoff_input.show() + self.start_button.setText("START DISCHARGE") + self.start_button.setEnabled(True) # Explicitly enable + elif mode_name == "Charge Test": + self.charge_cutoff_label.show() + self.charge_cutoff_input.show() + self.start_button.setText("START CHARGE") + self.start_button.setEnabled(True) # Explicitly enable + elif mode_name == "Live Monitoring": + self.record_button.show() + self.start_button.setText("START MONITORING") + # Only enable start button if device is connected + self.start_button.setEnabled(self.session_active) + + # Reset measurement state + self.reset_test() + + self.status_bar.showMessage(f"Mode changed to {mode_name}") + + def reset_test(self): + """Reset test state without stopping measurement""" + # Reset Downsampling + self.downsample_factor = 1 + self.downsample_counter = 0 + + # Clear all data buffers + with self.plot_mutex: + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + if hasattr(self, 'phase_data'): + self.phase_data.clear() + + # Also clear display buffers + if hasattr(self, 'display_time_data'): + self.display_time_data.clear() + self.display_voltage_data.clear() + self.display_current_data.clear() + + # Reset aggregation buffer + self.aggregation_buffer = { + 'time': [], 'voltage': [], 'current': [], + 'count': 0, 'last_plot_time': 0 + } + + # Clear measurement thread buffers if it exists + if hasattr(self, 'measurement_thread'): + self.measurement_thread.voltage_window.clear() + self.measurement_thread.current_window.clear() + with self.measurement_thread.measurement_queue.mutex: + self.measurement_thread.measurement_queue.queue.clear() + self.measurement_thread.start_time = time.time() + + # Reset capacities and timing + self.start_time = time.time() + self.last_update_time = self.start_time + self.capacity_ah = 0.0 + self.energy = 0.0 + if hasattr(self, 'charge_capacity'): + self.charge_capacity = 0.0 + if hasattr(self, 'coulomb_efficiency'): + self.coulomb_efficiency = 0.0 + + # Reset plot + self.reset_plot() + + # Update UI + self.phase_label.setText("Idle") + if hasattr(self, 'test_phase'): + self.test_phase = "Idle" + + def toggle_recording(self): + """Toggle data recording in Live Monitoring mode""" + if self.record_button.isChecked(): + # Start recording + try: + # Reset previous data + self.reset_test() + + # Reset measurement timing + if hasattr(self, 'measurement_thread'): + self.measurement_thread.start_time = time.time() + + if self.create_cycle_log_file(): + self.record_button.setText("Stop Recording") + self.status_bar.showMessage("Live recording started") + # Ensure monitoring is running + if not self.test_running: + self.start_live_monitoring() + else: + self.record_button.setChecked(False) + self.current_cycle_file = None + except Exception as e: + print(f"Error starting recording: {e}") + self.record_button.setChecked(False) + self.current_cycle_file = None + QMessageBox.critical(self, "Error", f"Failed to start recording:\n{str(e)}") + else: + # Stop recording + try: + if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None: + self.finalize_log_file() + self.record_button.setText("Start Recording") + self.status_bar.showMessage("Live recording stopped") + except Exception as e: + print(f"Error stopping recording: {e}") + + def handle_continuous_mode_change(self, state): + """Handle changes to continuous mode checkbox during operation""" + if not state and self.test_running: # If unchecked during test + self.status_bar.showMessage("Continuous mode disabled - will complete current cycle") + self.continuous_mode_check.setStyleSheet(f"color: {self.warning_color};") + QTimer.singleShot(2000, lambda: self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")) + + def setup_plot(self): + """Configure the matplotlib plot""" + self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color) + self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) + self.ax = self.fig.add_subplot(111) + self.ax.set_facecolor('#3B4252') + + # Set initial voltage range + voltage_padding = 0.2 + min_voltage = max(0, 0.9 - voltage_padding) + max_voltage = 1.43 + voltage_padding + self.ax.set_ylim(min_voltage, max_voltage) + + # Voltage plot + self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) + self.ax.set_ylabel("Voltage (V)", color='#00BFFF') + self.ax.tick_params(axis='y', labelcolor='#00BFFF') + + # Current plot (right axis) + self.ax2 = self.ax.twinx() + current_padding = 0.05 + test_current = 0.1 * 0.2 # Default values + max_current = test_current * 1.5 + self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) + + self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) + self.ax2.set_ylabel("Current (A)", color='r') + self.ax2.tick_params(axis='y', labelcolor='r') + + self.ax.set_xlabel('Time (s)', color=self.fg_color) + self.ax.set_title('Battery Test', color=self.fg_color) + self.ax.tick_params(axis='x', colors=self.fg_color) + self.ax.grid(True, color='#4C566A') + + # Position legends + self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) + self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) + + # Embed plot + self.canvas = FigureCanvas(self.fig) + self.canvas.setStyleSheet(f"background-color: {self.bg_color};") + self.main_layout.addWidget(self.canvas, 1) + + def init_device(self): + """Initialize the ADALM1000 device with continuous measurement""" + try: + # Clean up any existing session + if hasattr(self, 'session'): + try: + self.session.end() + del self.session + except: + pass + + time.sleep(1) + + self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) + if not self.session.devices: + raise Exception("No ADALM1000 detected - check connections") + + self.dev = self.session.devices[0] + self.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.dev.channels['B'].mode = pysmu.Mode.HI_Z + self.dev.channels['A'].constant(0) + self.dev.channels['B'].constant(0) + + self.session.start(0) + + self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;") + self.connection_label.setText("Connected") + self.status_bar.showMessage("Device connected | Ready to measure") + self.session_active = True + self.start_button.setEnabled(True) + + # Start measurement thread + self.measurement_thread = MeasurementThread(self.dev, self.interval) + self.measurement_thread.update_signal.connect(self.update_measurements) + self.measurement_thread.error_signal.connect(self.handle_device_error) + self.measurement_thread.start() + + except Exception as e: + self.handle_device_error(str(e)) + + @pyqtSlot(float, float, float) + def update_measurements(self, voltage, current, current_time): + try: + # Only store data if in a test or recording + if not (self.test_running or self.record_button.isChecked()): + return + + # 1. Originale Daten immer vollständig speichern (für Berechnungen und Logging) + with self.plot_mutex: + self.time_data.append(current_time) + self.voltage_data.append(voltage) + self.current_data.append(current) + + # 2. Downsampling für die Anzeige + if not hasattr(self, 'aggregation_buffer'): + self.aggregation_buffer = { + 'time': [], 'voltage': [], 'current': [], + 'count': 0, 'last_plot_time': 0 + } + + self.aggregation_buffer['time'].append(current_time) + self.aggregation_buffer['voltage'].append(voltage) + self.aggregation_buffer['current'].append(current) + self.aggregation_buffer['count'] += 1 + + # Nur aggregieren wenn genug Daten oder Zeit vergangen + now = time.time() + if (self.aggregation_buffer['count'] >= self.downsample_factor or + now - self.aggregation_buffer['last_plot_time'] >= 1.0): + + # Berechne aggregierte Werte (Mittelwert) + agg_time = np.mean(self.aggregation_buffer['time']) + agg_voltage = np.mean(self.aggregation_buffer['voltage']) + agg_current = np.mean(self.aggregation_buffer['current']) + + # Für die Anzeige verwenden + if not hasattr(self, 'display_time_data'): + self.display_time_data = deque(maxlen=self.max_points_to_keep) + self.display_voltage_data = deque(maxlen=self.max_points_to_keep) + self.display_current_data = deque(maxlen=self.max_points_to_keep) + + self.display_time_data.append(agg_time) + self.display_voltage_data.append(agg_voltage) + self.display_current_data.append(agg_current) + + # Reset Buffer + self.aggregation_buffer = { + 'time': [], 'voltage': [], 'current': [], + 'count': 0, 'last_plot_time': now + } + + # 3. Originale Funktionalität für Berechnungen beibehalten + self.voltage_label.setText(f"{voltage:.4f}") + self.current_label.setText(f"{abs(current):.4f}") + self.time_label.setText(self.format_time(current_time)) + + # Calculate and display power and energy + power = voltage * abs(current) + self.power_label.setText(f"{power:.4f}") + + if len(self.time_data) > 1: + delta_t = self.time_data[-1] - self.time_data[-2] + self.energy += power * delta_t / 3600 # Convert to Wh + self.energy_label.setText(f"{self.energy:.4f}") + + # 4. Auto-Skalierung anpassen + if len(self.time_data) > self.max_points_to_keep * 1.5: + self.adjust_downsampling() + + # 5. Plot updates throttled to 10Hz + if not hasattr(self, '_last_plot_update'): + self._last_plot_update = 0 + + if now - self._last_plot_update >= 0.1: + self._last_plot_update = now + QTimer.singleShot(0, self.update_plot) + + except Exception as e: + print(f"Error in update_measurements: {e}") + import traceback + traceback.print_exc() + # Versuche den Aggregationsbuffer zu retten + if hasattr(self, 'aggregation_buffer'): + agg_buffer = self.aggregation_buffer + if agg_buffer['count'] > 0: + try: + with self.plot_mutex: + if not hasattr(self, 'display_time_data'): + self.display_time_data = deque(maxlen=self.max_points_to_keep) + self.display_voltage_data = deque(maxlen=self.max_points_to_keep) + self.display_current_data = deque(maxlen=self.max_points_to_keep) + + self.display_time_data.append(np.mean(agg_buffer['time'])) + self.display_voltage_data.append(np.mean(agg_buffer['voltage'])) + self.display_current_data.append(np.mean(agg_buffer['current'])) + except: + pass + self.aggregation_buffer = { + 'time': [], 'voltage': [], 'current': [], + 'count': 0, 'last_plot_time': time.time() + } + + def adjust_downsampling(self): + current_length = len(self.time_data) + if current_length > self.max_points_to_keep * 1.5: + # Exponentiell erhöhen, aber max. 64 + new_factor = min(64, max(1, self.downsample_factor * 2)) + elif current_length < self.max_points_to_keep // 2: + # Halbieren, aber min. 1 + new_factor = max(1, self.downsample_factor // 2) + else: + return + + if new_factor != self.downsample_factor: + self.downsample_factor = new_factor + self.status_bar.showMessage( + f"Downsampling: Factor {self.downsample_factor}", 2000) + def update_status_and_plot(self): + """Combined status and plot update""" + self.update_status() + self.update_plot() + + def update_status(self): + """Update status information periodically""" + now = time.time() # Define 'now' at the start of the method + + if self.test_running or hasattr(self, 'record_button') and self.record_button.isChecked(): + if self.time_data: + current_time = self.time_data[-1] + if len(self.time_data) > 1: + delta_t = self.time_data[-1] - self.time_data[-2] + if delta_t > 0: + current_current = abs(self.current_data[-1]) + self.capacity_ah += current_current * delta_t / 3600 + self.capacity_label.setText(f"{self.capacity_ah:.4f}") + + # Logging (1x per second) + if (hasattr(self, 'log_writer') and + hasattr(self, 'current_cycle_file') and + self.current_cycle_file is not None and + not self.current_cycle_file.closed): + + # Initialize last log time if not exists + if not hasattr(self, '_last_log_time'): + self._last_log_time = now + + if self.time_data and (now - self._last_log_time >= 1.0): + try: + current_time = self.time_data[-1] + voltage = self.voltage_data[-1] + current = self.current_data[-1] + + if self.current_mode == "Cycle Test": + self.log_writer.writerow([ + f"{current_time:.4f}", + f"{voltage:.6f}", + f"{current:.6f}", + self.test_phase, + f"{self.capacity_ah:.4f}", + f"{self.charge_capacity:.4f}", + f"{self.coulomb_efficiency:.1f}", + f"{self.cycle_count}" + ]) + else: + self.log_writer.writerow([ + f"{current_time:.4f}", + f"{voltage:.6f}", + f"{current:.6f}", + self.test_phase if hasattr(self, 'test_phase') else "Live", + f"{self.capacity_ah:.4f}", + f"{voltage * current:.4f}", # Power + f"{self.energy:.4f}", # Energy + f"{self.cycle_count}" if hasattr(self, 'cycle_count') else "1" + ]) + self.current_cycle_file.flush() + self._last_log_time = now + except Exception as e: + print(f"Error writing to log file: {e}") + if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None: + try: + self.current_cycle_file.close() + except: + pass + self.record_button.setChecked(False) + self.current_cycle_file = None + + def start_test(self): + """Start the selected test mode""" + if self.current_mode == "Cycle Test": + self.start_cycle_test() + elif self.current_mode == "Discharge Test": + self.start_discharge_test() + elif self.current_mode == "Charge Test": + self.start_charge_test() + elif self.current_mode == "Live Monitoring": + self.start_live_monitoring() + + def start_cycle_test(self): + """Start the battery cycle test""" + # Clean up any previous test + + if hasattr(self, 'test_sequence_thread'): + try: + th = getattr(self, 'test_sequence_thread', None) + # Only operate on thread if it still exists and wasn't deleted by SIP + if th is not None and not sip.isdeleted(th): + try: + if th.isRunning(): + wk = getattr(self, 'test_sequence_worker', None) + if wk is not None and not sip.isdeleted(wk): + try: + wk.stop() + except Exception: + pass + try: + th.quit() + except Exception: + pass + try: + th.wait(500) + except Exception: + pass + except RuntimeError: + # Thread object may have been deleted concurrently + pass + except Exception: + pass + finally: + # Safely schedule deletion of worker and thread, if still valid + try: + wk = getattr(self, 'test_sequence_worker', None) + if wk is not None and not sip.isdeleted(wk): + try: + wk.deleteLater() + except Exception: + pass + except Exception: + pass + try: + th = getattr(self, 'test_sequence_thread', None) + if th is not None and not sip.isdeleted(th): + try: + th.deleteLater() + except Exception: + pass + except Exception: + pass + # Remove references to help GC + try: + delattr = delattr + if hasattr(self, 'test_sequence_thread'): + try: + delattr(self, 'test_sequence_thread') + except Exception: + try: + setattr(self, 'test_sequence_thread', None) + except Exception: + pass + except Exception: + pass + + + # Reset stop flag + self.request_stop = False + + if not self.test_running: + try: + # Get parameters from UI + self.capacity = float(self.capacity_input.text()) + self.charge_cutoff = float(self.charge_cutoff_input.text()) + self.discharge_cutoff = float(self.discharge_cutoff_input.text()) + self.rest_time = float(self.rest_time_input.text()) + self.c_rate = float(self.c_rate_input.text()) + + # Validate inputs + if self.capacity <= 0: + raise ValueError("Battery capacity must be positive") + if self.charge_cutoff <= self.discharge_cutoff: + raise ValueError("Charge cutoff must be higher than discharge cutoff") + if self.c_rate <= 0: + raise ValueError("C-rate must be positive") + + test_current = self.c_rate * self.capacity + if test_current > 0.2: + raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") + + # Clear ALL previous data completely + with self.plot_mutex: + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + self.phase_data.clear() + + # Reset capacities and timing + self.start_time = time.time() + self.last_update_time = self.start_time + self.capacity_ah = 0.0 + self.charge_capacity = 0.0 + self.coulomb_efficiency = 0.0 + self.cycle_count = 0 + self.energy = 0.0 + + # Reset measurement thread's timer and queues + if hasattr(self, 'measurement_thread'): + self.measurement_thread.start_time = time.time() + self.measurement_thread.voltage_window.clear() + self.measurement_thread.current_window.clear() + with self.measurement_thread.measurement_queue.mutex: + self.measurement_thread.measurement_queue.queue.clear() + + # Reset plot completely + self.reset_plot() + + # Start test + self.test_running = True + self.start_time = time.time() + self.last_update_time = time.time() + self.test_phase = "Initial Discharge" + self.phase_label.setText(self.test_phase) + + self.start_button.setEnabled(False) + self.stop_button.setEnabled(True) + self.status_bar.showMessage(f"Cycle test started | Current: {test_current:.4f}A") + + # Create log file + self.create_cycle_log_file() + + # Start test sequence in a QThread + self.test_sequence_thread = QThread() + self.test_sequence_worker = TestSequenceWorker( + self.dev, + test_current, + self.charge_cutoff, + self.discharge_cutoff, + self.rest_time, + self.continuous_mode_check.isChecked(), + self # Pass reference to main window for callbacks + ) + self.test_sequence_worker.moveToThread(self.test_sequence_thread) + + # Connect signals + self.test_sequence_worker.update_phase.connect(self.update_test_phase) + self.test_sequence_worker.update_status.connect(self.status_bar.showMessage) + self.test_sequence_worker.test_completed.connect(self.finalize_test) + self.test_sequence_worker.error_occurred.connect(self.handle_test_error) + self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit) + self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater) + self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater) + + # Start the thread and the worker's run method + self.test_sequence_thread.start() + QTimer.singleShot(0, self.test_sequence_worker.run) + + except Exception as e: + QMessageBox.critical(self, "Error", str(e)) + # Ensure buttons are in correct state if error occurs + self.start_button.setEnabled(True) + self.stop_button.setEnabled(False) + + def start_discharge_test(self): + """Start the battery discharge test""" + # Clean up any previous test + + if hasattr(self, 'discharge_thread'): + try: + th = getattr(self, 'discharge_thread', None) + # Only operate on thread if it still exists and wasn't deleted by SIP + if th is not None and not sip.isdeleted(th): + try: + if th.isRunning(): + wk = getattr(self, 'discharge_worker', None) + if wk is not None and not sip.isdeleted(wk): + try: + wk.stop() + except Exception: + pass + try: + th.quit() + except Exception: + pass + try: + th.wait(500) + except Exception: + pass + except RuntimeError: + # Thread object may have been deleted concurrently + pass + except Exception: + pass + finally: + # Safely schedule deletion of worker and thread, if still valid + try: + wk = getattr(self, 'discharge_worker', None) + if wk is not None and not sip.isdeleted(wk): + try: + wk.deleteLater() + except Exception: + pass + except Exception: + pass + try: + th = getattr(self, 'discharge_thread', None) + if th is not None and not sip.isdeleted(th): + try: + th.deleteLater() + except Exception: + pass + except Exception: + pass + # Remove references to help GC + try: + delattr = delattr + if hasattr(self, 'discharge_thread'): + try: + delattr(self, 'discharge_thread') + except Exception: + try: + setattr(self, 'discharge_thread', None) + except Exception: + pass + except Exception: + pass + + + # Reset stop flag + self.request_stop = False + + if not self.test_running: + try: + # Get parameters from UI + self.capacity = float(self.capacity_input.text()) + self.discharge_cutoff = float(self.discharge_cutoff_input.text()) + self.c_rate = float(self.c_rate_input.text()) + + # Validate inputs + if self.capacity <= 0: + raise ValueError("Battery capacity must be positive") + if self.c_rate <= 0: + raise ValueError("C-rate must be positive") + + test_current = self.c_rate * self.capacity + if test_current > 0.2: + raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") + + # Clear ALL previous data completely + with self.plot_mutex: + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + + # Reset capacities and timing + self.start_time = time.time() + self.last_update_time = self.start_time + self.capacity_ah = 0.0 + self.energy = 0.0 + self.cycle_count = 1 + + # Reset measurement thread's timer and queues + if hasattr(self, 'measurement_thread'): + self.measurement_thread.start_time = time.time() + self.measurement_thread.voltage_window.clear() + self.measurement_thread.current_window.clear() + with self.measurement_thread.measurement_queue.mutex: + self.measurement_thread.measurement_queue.queue.clear() + + # Reset plot completely + self.reset_plot() + + # Start test + self.test_running = True + self.start_time = time.time() + self.last_update_time = time.time() + self.test_phase = "Discharge" + self.phase_label.setText(self.test_phase) + + self.start_button.setEnabled(False) + self.stop_button.setEnabled(True) + self.status_bar.showMessage(f"Discharge started | Current: {test_current:.4f}A") + + # Create log file + self.create_cycle_log_file() + + # Start discharge worker in a QThread + self.discharge_thread = QThread() + self.discharge_worker = DischargeWorker( + self.dev, + test_current, + self.discharge_cutoff, + self # Pass reference to main window for callbacks + ) + self.discharge_worker.moveToThread(self.discharge_thread) + + # Connect signals + self.discharge_worker.update_status.connect(self.status_bar.showMessage) + self.discharge_worker.test_completed.connect(self.finalize_test) + self.discharge_worker.error_occurred.connect(self.handle_test_error) + self.discharge_worker.finished.connect(self.discharge_thread.quit) + self.discharge_worker.finished.connect(self.discharge_worker.deleteLater) + self.discharge_thread.finished.connect(self.discharge_thread.deleteLater) + + # Start the thread and the worker's run method + self.discharge_thread.start() + QTimer.singleShot(0, self.discharge_worker.run) + + except Exception as e: + QMessageBox.critical(self, "Error", str(e)) + # Ensure buttons are in correct state if error occurs + self.start_button.setEnabled(True) + self.stop_button.setEnabled(False) + + def start_charge_test(self): + """Start the battery charge test""" + # Clean up any previous test + + if hasattr(self, 'charge_thread'): + try: + th = getattr(self, 'charge_thread', None) + # Only operate on thread if it still exists and wasn't deleted by SIP + if th is not None and not sip.isdeleted(th): + try: + if th.isRunning(): + wk = getattr(self, 'charge_worker', None) + if wk is not None and not sip.isdeleted(wk): + try: + wk.stop() + except Exception: + pass + try: + th.quit() + except Exception: + pass + try: + th.wait(500) + except Exception: + pass + except RuntimeError: + # Thread object may have been deleted concurrently + pass + except Exception: + pass + finally: + # Safely schedule deletion of worker and thread, if still valid + try: + wk = getattr(self, 'charge_worker', None) + if wk is not None and not sip.isdeleted(wk): + try: + wk.deleteLater() + except Exception: + pass + except Exception: + pass + try: + th = getattr(self, 'charge_thread', None) + if th is not None and not sip.isdeleted(th): + try: + th.deleteLater() + except Exception: + pass + except Exception: + pass + # Remove references to help GC + try: + delattr = delattr + if hasattr(self, 'charge_thread'): + try: + delattr(self, 'charge_thread') + except Exception: + try: + setattr(self, 'charge_thread', None) + except Exception: + pass + except Exception: + pass + + + # Reset stop flag + self.request_stop = False + + if not self.test_running: + try: + # Get parameters from UI + self.capacity = float(self.capacity_input.text()) + self.charge_cutoff = float(self.charge_cutoff_input.text()) + self.c_rate = float(self.c_rate_input.text()) + + # Validate inputs + if self.capacity <= 0: + raise ValueError("Battery capacity must be positive") + if self.c_rate <= 0: + raise ValueError("C-rate must be positive") + + test_current = self.c_rate * self.capacity + if test_current > 0.2: + raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") + + # Clear ALL previous data completely + with self.plot_mutex: + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + + # Reset capacities and timing + self.start_time = time.time() + self.last_update_time = self.start_time + self.capacity_ah = 0.0 + self.energy = 0.0 + self.cycle_count = 1 + + # Reset measurement thread + if hasattr(self, 'measurement_thread'): + self.measurement_thread.start_time = time.time() + self.measurement_thread.voltage_window.clear() + self.measurement_thread.current_window.clear() + with self.measurement_thread.measurement_queue.mutex: + self.measurement_thread.measurement_queue.queue.clear() + + # Reset plot + self.reset_plot() + + # Start test + self.test_running = True + self.start_time = time.time() + self.last_update_time = time.time() + self.test_phase = "Charge" + self.phase_label.setText(self.test_phase) + + self.start_button.setEnabled(False) + self.stop_button.setEnabled(True) + self.status_bar.showMessage(f"Charge started @ {test_current:.3f}A to {self.charge_cutoff}V") + + # Create log file + self.create_cycle_log_file() + + # Start charge worker in a QThread + self.charge_thread = QThread() + self.charge_worker = ChargeWorker( + self.dev, + test_current, + self.charge_cutoff, + self + ) + self.charge_worker.moveToThread(self.charge_thread) + + # Connect signals + self.charge_worker.update_status.connect(self.status_bar.showMessage) + self.charge_worker.test_completed.connect(self.finalize_test) + self.charge_worker.error_occurred.connect(self.handle_test_error) + self.charge_worker.finished.connect(self.charge_thread.quit) + self.charge_worker.finished.connect(self.charge_worker.deleteLater) + self.charge_thread.finished.connect(self.charge_thread.deleteLater) + + # Start the thread + self.charge_thread.start() + QTimer.singleShot(0, self.charge_worker.run) + + except Exception as e: + QMessageBox.critical(self, "Error", str(e)) + self.start_button.setEnabled(True) + self.stop_button.setEnabled(False) + + def start_live_monitoring(self): + """Start live monitoring mode""" + try: + # Reset everything completely + self.reset_test() + + # Reset measurement timing + if hasattr(self, 'measurement_thread'): + self.measurement_thread.start_time = time.time() + + # Set monitoring flags + self.test_running = True + self.test_phase = "Live Monitoring" + self.phase_label.setText(self.test_phase) + + # Update UI + self.stop_button.setEnabled(True) + self.start_button.setEnabled(False) + + # Configure device for monitoring + if hasattr(self, 'dev'): + try: + self.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.dev.channels['A'].constant(0) + self.dev.channels['B'].mode = pysmu.Mode.HI_Z + except Exception as e: + print(f"Error configuring device for monitoring: {e}") + + self.status_bar.showMessage("Live monitoring started") + except Exception as e: + print(f"Error starting live monitoring: {e}") + self.test_running = False + QMessageBox.critical(self, "Error", f"Failed to start monitoring:\n{str(e)}") + + def create_cycle_log_file(self): + """Create a new log file for the current test""" + try: + self._last_log_time = time.time() + # Close previous file if exists + if hasattr(self, 'current_cycle_file') and self.current_cycle_file: + try: + self.current_cycle_file.close() + except Exception as e: + print(f"Error closing previous log file: {e}") + + # Ensure log directory exists + os.makedirs(self.log_dir, exist_ok=True) + + if not os.access(self.log_dir, os.W_OK): + QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}") + return False + + # Generate filename based on mode + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + if self.current_mode == "Cycle Test": + self.filename = os.path.join(self.log_dir, f"battery_cycle_{timestamp}.csv") + elif self.current_mode == "Discharge Test": + self.filename = os.path.join(self.log_dir, f"battery_discharge_{timestamp}.csv") + else: # Live Monitoring + self.filename = os.path.join(self.log_dir, f"battery_live_{timestamp}.csv") + + # Open new file + try: + self.current_cycle_file = open(self.filename, 'w', newline='') + + # Write header with test parameters + test_current = self.c_rate * self.capacity + test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A" + + self.current_cycle_file.write(f"# ADALM1000 Battery Test Log - {self.current_mode}\n") + self.current_cycle_file.write(f"# Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + self.current_cycle_file.write(f"# Battery Capacity: {self.capacity} Ah\n") + + if self.current_mode != "Live Monitoring": + self.current_cycle_file.write(f"# Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n") + + if self.current_mode == "Cycle Test": + self.current_cycle_file.write(f"# Charge Cutoff: {self.charge_cutoff} V\n") + self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n") + self.current_cycle_file.write(f"# Rest Time: {self.rest_time} hours\n") + elif self.current_mode == "Discharge Test": + self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n") + + self.current_cycle_file.write(f"# Test Conditions/Chemistry: {test_conditions}\n") + self.current_cycle_file.write("#\n") + + # Write data header + self.log_writer = csv.writer(self.current_cycle_file) + + if self.current_mode == "Cycle Test": + self.log_writer.writerow([ + "Time(s)", "Voltage(V)", "Current(A)", "Phase", + "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", + "Coulomb_Eff(%)", "Cycle" + ]) + else: + self.log_writer.writerow([ + "Time(s)", "Voltage(V)", "Current(A)", "Phase", + "Capacity(Ah)", "Power(W)", "Energy(Wh)", "Cycle" + ]) + return True + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to create log file: {e}") + return False + except Exception as e: + print(f"Error in create_cycle_log_file: {e}") + return False + + def finalize_log_file(self): + """Finalize the current log file""" + if hasattr(self, 'current_cycle_file') and self.current_cycle_file: + try: + test_current = self.c_rate * self.capacity + test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A" + + self.current_cycle_file.write("\n# TEST SUMMARY\n") + self.current_cycle_file.write(f"# Test Parameters:\n") + self.current_cycle_file.write(f"# - Battery Capacity: {self.capacity} Ah\n") + + if self.current_mode != "Live Monitoring": + self.current_cycle_file.write(f"# - Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n") + + if self.current_mode == "Cycle Test": + self.current_cycle_file.write(f"# - Charge Cutoff: {self.charge_cutoff} V\n") + self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n") + self.current_cycle_file.write(f"# - Rest Time: {self.rest_time} hours\n") + elif self.current_mode == "Discharge Test": + self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n") + + self.current_cycle_file.write(f"# - Test Conditions: {test_conditions}\n") + self.current_cycle_file.write(f"# Results:\n") + + if self.current_mode == "Cycle Test": + self.current_cycle_file.write(f"# - Cycles Completed: {self.cycle_count}\n") + self.current_cycle_file.write(f"# - Final Discharge Capacity: {self.capacity_ah:.4f} Ah\n") + self.current_cycle_file.write(f"# - Final Charge Capacity: {self.charge_capacity:.4f} Ah\n") + self.current_cycle_file.write(f"# - Coulombic Efficiency: {self.coulomb_efficiency:.1f}%\n") + else: + self.current_cycle_file.write(f"# - Capacity: {self.capacity_ah:.4f} Ah\n") + self.current_cycle_file.write(f"# - Energy: {self.energy:.4f} Wh\n") + + self.current_cycle_file.close() + except Exception as e: + print(f"Error closing log file: {e}") + finally: + self.current_cycle_file = None + + def format_time(self, seconds): + """Convert seconds to hh:mm:ss format""" + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + seconds = int(seconds % 60) + return f"{hours:02d}:{minutes:02d}:{seconds:02d}" + + def stop_test(self): + """Request immediate stop of the current test or monitoring""" + if not self.test_running and not (hasattr(self, 'record_button') and self.record_button.isChecked()): + return + + self.request_stop = True + self.test_running = False + self.measuring = False + + # Stop any active test threads + if hasattr(self, 'test_sequence_worker'): + try: + if not sip.isdeleted(self.test_sequence_worker): + self.test_sequence_worker.stop() + except: + pass + + if hasattr(self, 'discharge_worker'): + try: + if not sip.isdeleted(self.discharge_worker): + self.discharge_worker.stop() + except: + pass + + # Stop recording if active + if hasattr(self, 'record_button') and self.record_button.isChecked(): + self.record_button.setChecked(False) + if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None: + self.finalize_log_file() + self.record_button.setText("Start Recording") + + # Reset device to safe state + if hasattr(self, 'dev'): + try: + self.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.dev.channels['A'].constant(0) + self.dev.channels['B'].mode = pysmu.Mode.HI_Z + except Exception as e: + print(f"Error resetting device: {e}") + + # Clear all data buffers + with self.plot_mutex: + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + if hasattr(self, 'phase_data'): + self.phase_data.clear() + + # Reset measurements + self.capacity_ah = 0.0 + self.energy = 0.0 + if hasattr(self, 'charge_capacity'): + self.charge_capacity = 0.0 + if hasattr(self, 'coulomb_efficiency'): + self.coulomb_efficiency = 0.0 + + # Reset plot + self.reset_plot() + + # Update UI + self.test_phase = "Idle" + self.phase_label.setText(self.test_phase) + self.stop_button.setEnabled(False) + self.start_button.setEnabled(True) + + if self.current_mode == "Live Monitoring": + self.status_bar.showMessage("Live monitoring stopped") + else: + self.status_bar.showMessage("Test stopped - Ready for new test") + + def finalize_test(self): + """Final cleanup after test completes or is stopped""" + try: + # 1. Stop any active measurement or test operations + self.measuring = False + self.test_running = False + + # 2. Reset device to safe state + if hasattr(self, 'dev'): + try: + self.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.dev.channels['A'].constant(0) + self.dev.channels['B'].mode = pysmu.Mode.HI_Z + except Exception as e: + print(f"Error resetting device in finalize: {e}") + + # 3. Clean up test sequence thread safely + if hasattr(self, 'test_sequence_thread'): + try: + if self.test_sequence_thread.isRunning(): + if hasattr(self, 'test_sequence_worker'): + try: + self.test_sequence_worker.stop() + except RuntimeError: + pass + + self.test_sequence_thread.quit() + self.test_sequence_thread.wait(500) + except Exception as e: + print(f"Error stopping test sequence thread: {e}") + finally: + if hasattr(self, 'test_sequence_worker'): + try: + if not sip.isdeleted(self.test_sequence_worker): + self.test_sequence_worker.deleteLater() + except: + pass + + if hasattr(self, 'test_sequence_thread'): + try: + if not sip.isdeleted(self.test_sequence_thread): + self.test_sequence_thread.deleteLater() + except: + pass + finally: + if hasattr(self, 'test_sequence_thread'): + del self.test_sequence_thread + + # 4. Clean up discharge thread safely + if hasattr(self, 'discharge_thread'): + try: + if self.discharge_thread.isRunning(): + if hasattr(self, 'discharge_worker'): + try: + self.discharge_worker.stop() + except RuntimeError: + pass + + self.discharge_thread.quit() + self.discharge_thread.wait(500) + except Exception as e: + print(f"Error stopping discharge thread: {e}") + finally: + if hasattr(self, 'discharge_worker'): + try: + if not sip.isdeleted(self.discharge_worker): + self.discharge_worker.deleteLater() + except: + pass + + if hasattr(self, 'discharge_thread'): + try: + if not sip.isdeleted(self.discharge_thread): + self.discharge_thread.deleteLater() + except: + pass + finally: + if hasattr(self, 'discharge_thread'): + del self.discharge_thread + + # 5. Clean up charge thread safely (using same pattern as discharge thread) + if hasattr(self, 'charge_thread'): + try: + if self.charge_thread.isRunning(): + if hasattr(self, 'charge_worker'): + try: + self.charge_worker.stop() + except RuntimeError: + pass + + self.charge_thread.quit() + self.charge_thread.wait(500) + except Exception as e: + print(f"Error stopping charge thread: {e}") + finally: + if hasattr(self, 'charge_worker'): + try: + if not sip.isdeleted(self.charge_worker): + self.charge_worker.deleteLater() + except: + pass + + if hasattr(self, 'charge_thread'): + try: + if not sip.isdeleted(self.charge_thread): + self.charge_thread.deleteLater() + except: + pass + finally: + if hasattr(self, 'charge_thread'): + del self.charge_thread + + # 6. Finalize log file + self.finalize_log_file() + + # 7. Reset UI and state + self.request_stop = False + self.start_button.setEnabled(True) + self.stop_button.setEnabled(False) + + # 8. Show completion message if test wasn't stopped by user + if not self.request_stop: + test_current = self.c_rate * self.capacity + test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A" + + if self.current_mode == "Cycle Test": + message = ( + f"Cycle test completed | " + f"Cycle {self.cycle_count} | " + f"Capacity: {self.capacity_ah:.4f}Ah | " + f"Efficiency: {self.coulomb_efficiency:.1f}%" + ) + + QMessageBox.information( + self, + "Test Completed", + f"Cycle test completed successfully.\n\n" + f"Test Parameters:\n" + f"- Capacity: {self.capacity} Ah\n" + f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n" + f"- Charge Cutoff: {self.charge_cutoff} V\n" + f"- Discharge Cutoff: {self.discharge_cutoff} V\n" + f"- Conditions: {test_conditions}\n\n" + f"Results:\n" + f"- Cycles: {self.cycle_count}\n" + f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n" + f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%" + ) + elif self.current_mode == "Discharge Test": + message = ( + f"Discharge completed | " + f"Capacity: {self.capacity_ah:.4f}Ah | " + f"Energy: {self.energy:.4f}Wh" + ) + + QMessageBox.information( + self, + "Discharge Completed", + f"Discharge test completed successfully.\n\n" + f"Test Parameters:\n" + f"- Capacity: {self.capacity} Ah\n" + f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n" + f"- Discharge Cutoff: {self.discharge_cutoff} V\n" + f"- Conditions: {test_conditions}\n\n" + f"Results:\n" + f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n" + f"- Energy delivered: {self.energy:.4f}Wh" + ) + + self.status_bar.showMessage(message) + + except Exception as e: + print(f"Error in finalize_test: {e}") + import traceback + traceback.print_exc() + # Ensure we don't leave the UI in a locked state + self.start_button.setEnabled(True) + self.stop_button.setEnabled(False) + self.status_bar.showMessage("Error during test finalization") + + def reset_plot(self): + """Completely reset the plot - clears all data and visuals""" + # Clear line data + self.line_voltage.set_data([], []) + self.line_current.set_data([], []) + + # Reset axes with appropriate ranges + voltage_padding = 0.2 + min_voltage = 0 + max_voltage = 5.0 # Max voltage for ADALM1000 + + self.ax.set_xlim(0, 10) # Reset X axis + self.ax.set_ylim(min_voltage, max_voltage) + self.ax.set_xlabel('Time (s)', color=self.fg_color) + self.ax.set_ylabel("Voltage (V)", color='#00BFFF') + self.ax.set_title('Battery Test', color=self.fg_color) + self.ax.tick_params(axis='x', colors=self.fg_color) + self.ax.tick_params(axis='y', labelcolor='#00BFFF') + self.ax.grid(True, color='#4C566A') + + # Reset twin axis (current) + current_padding = 0.05 + self.ax2.set_xlim(0, 10) + self.ax2.set_ylim(-0.25 - current_padding, 0.25 + current_padding) + self.ax2.set_ylabel("Current (A)", color='r') + self.ax2.tick_params(axis='y', labelcolor='r') + + # Redraw legends + self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) + self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) + + # Force immediate redraw + self.canvas.draw() + + def update_status_and_plot(self): + """Combined status and plot update""" + self.update_status() + self.update_plot() + + def update_plot(self): + """More robust plotting with error handling""" + try: + # Create local copies of data safely + with self.plot_mutex: + if not self.display_time_data: + return + + x_data = np.array(self.display_time_data) + y1_data = np.array(self.display_voltage_data) + y2_data = np.array(self.display_current_data) + + # Update plot data + self.line_voltage.set_data(x_data, y1_data) + self.line_current.set_data(x_data, y2_data) + + # Auto-scale when needed + if len(x_data) > 1: + self.auto_scale_axes() + + # Force redraw + self.canvas.draw_idle() + + except Exception as e: + print(f"Plot error: {e}") + # Attempt to recover + self.reset_plot() + + def auto_scale_axes(self): + """Auto-scale plot axes with appropriate padding and strict boundaries""" + if not self.time_data: + return + + min_time = 0 + max_time = self.time_data[-1] + current_xlim = self.ax.get_xlim() + + if max_time > current_xlim[1] * 0.95: + new_max = max_time * 1.05 + self.ax.set_xlim(min_time, new_max) + self.ax2.set_xlim(min_time, new_max) + + voltage_padding = 0.2 + if self.voltage_data: + min_voltage = max(0, min(self.voltage_data) - voltage_padding) + max_voltage = min(5.0, max(self.voltage_data) + voltage_padding) + current_ylim = self.ax.get_ylim() + if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1): + self.ax.set_ylim(min_voltage, max_voltage) + + current_padding = 0.05 + if self.current_data: + min_current = max(-0.25, min(self.current_data) - current_padding) + max_current = min(0.25, max(self.current_data) + current_padding) + current_ylim2 = self.ax2.get_ylim() + if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02): + self.ax2.set_ylim(min_current, max_current) + + @pyqtSlot(str) + def handle_device_error(self, error): + """Handle device connection errors""" + error_msg = str(error) + print(f"Device error: {error_msg}") + + if hasattr(self, 'session'): + try: + if self.session_active: + self.session.end() + del self.session + except Exception as e: + print(f"Error cleaning up session: {e}") + + self.status_light.setStyleSheet(f"background-color: red; border-radius: 10px;") + self.connection_label.setText("Disconnected") + self.status_bar.showMessage(f"Device error: {error_msg}") + + self.session_active = False + self.test_running = False + self.continuous_mode = False + self.measuring = False + + self.start_button.setEnabled(False) + self.stop_button.setEnabled(False) + + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + + @pyqtSlot(str) + def update_test_phase(self, phase_text): + """Update the test phase display""" + self.test_phase = phase_text + self.phase_label.setText(phase_text) + + @pyqtSlot(str) + def handle_test_error(self, error_msg): + """Handle errors from the test sequence with complete cleanup""" + try: + # 1. Notify user + QMessageBox.critical(self, "Test Error", + f"An error occurred:\n{error_msg}\n\nAttempting to recover...") + + # 2. Stop all operations + self.stop_test() + + # 3. Reset UI elements + if hasattr(self, 'line_voltage'): + try: + self.line_voltage.set_data([], []) + self.line_current.set_data([], []) + self.ax.set_xlim(0, 1) + self.ax2.set_xlim(0, 1) + self.canvas.draw() + except Exception as plot_error: + print(f"Plot reset error: {plot_error}") + + # 4. Update status + self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...") + self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;") + + # 5. Attempt recovery + QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect + + except Exception as e: + print(f"Error in error handler: {e}") + # Fallback - restart application? + QMessageBox.critical(self, "Fatal Error", + "The application needs to restart due to an unrecoverable error") + QTimer.singleShot(1000, self.close) + + def attempt_reconnect(self): + """Attempt to reconnect automatically""" + QMessageBox.critical( + self, + "Device Connection Error", + "Could not connect to ADALM1000\n\n" + "1. Check USB cable connection\n" + "2. The device will attempt to reconnect automatically" + ) + + QTimer.singleShot(1000, self.reconnect_device) + + def reconnect_device(self): + """Reconnect the device with proper cleanup""" + self.status_bar.showMessage("Attempting to reconnect...") + + if hasattr(self, 'session'): + try: + if self.session_active: + self.session.end() + del self.session + except: + pass + + self.test_running = False + self.continuous_mode = False + self.measuring = False + + if hasattr(self, 'measurement_thread'): + self.measurement_thread.stop() + + time.sleep(1.5) + + try: + self.init_device() + if self.session_active: + self.status_bar.showMessage("Reconnected successfully") + return + except Exception as e: + print(f"Reconnect failed: {e}") + + self.status_bar.showMessage("Reconnect failed - will retry...") + QTimer.singleShot(2000, self.reconnect_device) + + def closeEvent(self, event): + """Clean up on window close""" + self.test_running = False + self.measuring = False + self.session_active = False + + # Stop measurement thread + if hasattr(self, 'measurement_thread'): + self.measurement_thread.stop() + + # Stop test sequence thread + + if hasattr(self, 'test_sequence_thread'): + try: + th = getattr(self, 'test_sequence_thread', None) + # Only operate on thread if it still exists and wasn't deleted by SIP + if th is not None and not sip.isdeleted(th): + try: + if th.isRunning(): + wk = getattr(self, 'test_sequence_worker', None) + if wk is not None and not sip.isdeleted(wk): + try: + wk.stop() + except Exception: + pass + try: + th.quit() + except Exception: + pass + try: + th.wait(500) + except Exception: + pass + except RuntimeError: + # Thread object may have been deleted concurrently + pass + except Exception: + pass + finally: + # Safely schedule deletion of worker and thread, if still valid + try: + wk = getattr(self, 'test_sequence_worker', None) + if wk is not None and not sip.isdeleted(wk): + try: + wk.deleteLater() + except Exception: + pass + except Exception: + pass + try: + th = getattr(self, 'test_sequence_thread', None) + if th is not None and not sip.isdeleted(th): + try: + th.deleteLater() + except Exception: + pass + except Exception: + pass + # Remove references to help GC + try: + delattr = delattr + if hasattr(self, 'test_sequence_thread'): + try: + delattr(self, 'test_sequence_thread') + except Exception: + try: + setattr(self, 'test_sequence_thread', None) + except Exception: + pass + except Exception: + pass + + # Stop discharge thread + if hasattr(self, 'discharge_thread'): + if hasattr(self, 'discharge_worker'): + self.discharge_worker.stop() + self.discharge_thread.quit() + self.discharge_thread.wait(500) + + # Clean up device session + if hasattr(self, 'session') and self.session: + try: + self.session.end() + except Exception as e: + print(f"Error ending session: {e}") + + event.accept() + +if __name__ == "__main__": + app = QApplication([]) + try: + window = BatteryTester() + window.show() + app.exec_() + except Exception as e: + QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}") \ No newline at end of file