Adalm1000_Logger/adalm1000_logger.py
Vincent Hanewinkel f09b7c481b init commit
2025-08-14 20:15:43 +02:00

2400 lines
101 KiB
Python

# -*- coding: utf-8 -*-
import os
import time
import csv
import threading
from datetime import datetime
import numpy as np
import matplotlib
matplotlib.use('Qt5Agg')
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
from collections import deque
from queue import Queue, Full, Empty
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel,
QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog, QComboBox)
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread
from PyQt5 import sip
import pysmu
class DeviceDisconnectedError(Exception):
pass
class MeasurementThread(QThread):
update_signal = pyqtSignal(float, float, float)
error_signal = pyqtSignal(str)
def __init__(self, device, interval=0.1):
super().__init__()
self.device = device
self.interval = interval
self._running = False
self.filter_window_size = 10
self.voltage_window = []
self.current_window = []
self.start_time = None
self.measurement_queue = Queue(maxsize=1)
self.current_direction = 1 # 1 for source, -1 for sink
def run(self):
"""Continuous measurement loop"""
self._running = True
if self.start_time is None: # Nur setzen wenn noch nicht gesetzt
self.start_time = time.time()
while self._running:
try:
samples = self.device.read(self.filter_window_size, 500, True)
if not samples:
raise DeviceDisconnectedError("No samples received")
current_time = time.time() - self.start_time
# Get voltage from Channel B (HI_Z mode) and current from Channel A
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
raw_current = np.mean([s[0][1] for s in samples]) * self.current_direction # Channel A current with direction
# Update filter windows
self.voltage_window.append(raw_voltage)
self.current_window.append(raw_current)
if len(self.voltage_window) > self.filter_window_size:
self.voltage_window.pop(0)
self.current_window.pop(0)
voltage = np.mean(self.voltage_window)
current = np.mean(self.current_window)
# Validate measurements
if voltage is None or not (-1.0 <= voltage <= 6.0):
raise ValueError(f"Invalid voltage: {voltage}V")
if not (-0.25 <= current <= 0.25):
raise ValueError(f"Invalid current: {current}A")
# Emit update
self.update_signal.emit(voltage, current, current_time)
# Store measurement
try:
self.measurement_queue.put_nowait((voltage, current))
except Full:
pass
time.sleep(max(0.05, self.interval))
except Exception as e:
self.error_signal.emit(f"Read error: {str(e)}")
time.sleep(1)
continue
def set_direction(self, direction):
"""Set current direction (1 for source, -1 for sink)"""
self.current_direction = direction
def stop(self):
self._running = False
self.wait(500)
class TestSequenceWorker(QObject):
finished = pyqtSignal()
update_phase = pyqtSignal(str)
update_status = pyqtSignal(str)
test_completed = pyqtSignal()
error_occurred = pyqtSignal(str)
def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent):
super().__init__()
self.device = device
self.test_current = test_current
self.charge_cutoff = charge_cutoff
self.discharge_cutoff = discharge_cutoff
self.rest_time = rest_time * 3600 # Convert hours to seconds
self.continuous_mode = continuous_mode
self.parent = parent
self._running = True
self.voltage_timeout = 0.5 # seconds
def get_latest_measurement(self):
"""Thread-safe measurement reading with timeout"""
try:
return self.parent.measurement_thread.measurement_queue.get(
timeout=self.voltage_timeout
)
except Empty:
return (None, None) # Return tuple for unpacking
def charge_phase(self):
"""Handle the battery charging phase"""
self.update_phase.emit("Charge")
self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.4f}A")
try:
# Configure channels - Channel A sources current, Channel B measures voltage
self.device.channels['B'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].mode = pysmu.Mode.SIMV
self.device.channels['A'].constant(self.test_current)
self.parent.measurement_thread.set_direction(1) # Source current
# Small delay to allow current to stabilize
time.sleep(0.1)
while self._running:
voltage, current = self.get_latest_measurement()
if voltage is None:
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if voltage >= self.charge_cutoff:
break
time.sleep(0.1)
finally:
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
def discharge_phase(self):
"""Handle the battery discharging phase"""
voltage, _ = self.get_latest_measurement()
if voltage is not None and voltage <= self.discharge_cutoff:
self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)")
return
self.update_phase.emit("Discharge")
self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A")
try:
# Configure channels - Channel A sinks current, Channel B measures voltage
self.device.channels['B'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].mode = pysmu.Mode.SIMV
self.device.channels['A'].constant(-self.test_current)
self.parent.measurement_thread.set_direction(-1) # Sink current
# Small delay to allow current to stabilize
time.sleep(0.1)
while self._running:
voltage, current = self.get_latest_measurement()
if voltage is None:
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if voltage <= self.discharge_cutoff:
break
time.sleep(0.1)
finally:
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
def rest_phase(self, phase_name):
"""Handle rest period between phases"""
self.update_phase.emit(f"Resting ({phase_name})")
rest_end = time.time() + self.rest_time
while time.time() < rest_end and self._running:
time_left = max(0, rest_end - time.time())
self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min")
time.sleep(1)
def stop(self):
"""Request the thread to stop"""
self._running = False
try:
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
self.device.channels['B'].mode = pysmu.Mode.HI_Z
except Exception as e:
print(f"Error stopping device: {e}")
def run(self):
"""Main test sequence loop"""
try:
first_cycle = True # Ensure at least one cycle runs
while (self._running and
(self.parent.continuous_mode_check.isChecked() or first_cycle)):
self.parent.request_stop = False
self.parent.cycle_count += 1
first_cycle = False # Only True for the first cycle
# 1. Charge phase (constant current)
self.charge_phase()
if not self._running or self.parent.request_stop:
break
# 2. Rest period after charge
self.rest_phase("Post-Charge")
if not self._running or self.parent.request_stop:
break
# 3. Discharge phase (capacity measurement)
self.discharge_phase()
if not self._running or self.parent.request_stop:
break
# 4. Rest period after discharge (only if not stopping)
if self._running and not self.parent.request_stop:
self.rest_phase("Post-Discharge")
# Calculate Coulomb efficiency if not stopping
if not self.parent.request_stop and self.parent.charge_capacity > 0:
self.parent.coulomb_efficiency = (
self.parent.capacity_ah / self.parent.charge_capacity
) * 100
# Test completed
self.test_completed.emit()
except Exception as e:
self.error_occurred.emit(f"Test sequence error: {str(e)}")
finally:
self.finished.emit()
class DischargeWorker(QObject):
finished = pyqtSignal()
update_status = pyqtSignal(str)
test_completed = pyqtSignal()
error_occurred = pyqtSignal(str)
def __init__(self, device, test_current, discharge_cutoff, parent):
super().__init__()
self.device = device
self.test_current = test_current
self.discharge_cutoff = discharge_cutoff
self.parent = parent
self._running = True
self.voltage_timeout = 0.5 # seconds
def get_latest_measurement(self):
"""Thread-safe measurement reading with timeout"""
try:
return self.parent.measurement_thread.measurement_queue.get(
timeout=self.voltage_timeout
)
except Empty:
return (None, None) # Return tuple for unpacking
def discharge_phase(self):
"""Handle the battery discharging phase"""
voltage, _ = self.get_latest_measurement()
if voltage is not None and voltage <= self.discharge_cutoff:
self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)")
return
self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A")
try:
# Configure channels - Channel A sinks current, Channel B measures voltage
self.device.channels['B'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].mode = pysmu.Mode.SIMV
self.device.channels['A'].constant(-self.test_current)
self.parent.measurement_thread.set_direction(-1) # Sink current
# Small delay to allow current to stabilize
time.sleep(0.1)
while self._running:
voltage, current = self.get_latest_measurement()
if voltage is None:
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if voltage <= self.discharge_cutoff:
break
time.sleep(0.1)
finally:
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
def stop(self):
"""Request the thread to stop"""
self._running = False
try:
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
self.device.channels['B'].mode = pysmu.Mode.HI_Z
except Exception as e:
print(f"Error stopping device: {e}")
def run(self):
"""Main discharge sequence"""
try:
self.parent.request_stop = False
self.parent.cycle_count = 1 # Only one discharge cycle
# Discharge phase
self.discharge_phase()
if not self._running or self.parent.request_stop:
return
# Test completed
self.test_completed.emit()
except Exception as e:
self.error_occurred.emit(f"Discharge error: {str(e)}")
finally:
self.finished.emit()
class ChargeWorker(QObject):
finished = pyqtSignal()
update_status = pyqtSignal(str)
test_completed = pyqtSignal()
error_occurred = pyqtSignal(str)
def __init__(self, device, test_current, charge_cutoff, parent):
super().__init__()
self.device = device
self.test_current = test_current
self.charge_cutoff = charge_cutoff
self.parent = parent
self._running = True
def run(self):
"""Main charge sequence"""
try:
self.parent.measurement_thread.set_direction(1) # Source current
# Configure channels - Channel A sources current, Channel B measures voltage
self.device.channels['B'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].mode = pysmu.Mode.SIMV
self.device.channels['A'].constant(self.test_current)
time.sleep(0.1) # Allow current to stabilize
while self._running:
voltage, current = self.parent.get_latest_measurement()
if voltage is None:
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if voltage >= self.charge_cutoff:
break
time.sleep(0.1)
self.test_completed.emit()
except Exception as e:
self.error_occurred.emit(f"Charge error: {str(e)}")
finally:
self.device.channels['A'].constant(0)
self.finished.emit()
def stop(self):
"""Request the thread to stop"""
self._running = False
try:
self.device.channels['A'].constant(0)
except Exception as e:
print(f"Error stopping charge: {e}")
class BatteryTester(QMainWindow):
def __init__(self):
self.plot_mutex = threading.Lock()
super().__init__()
self.last_logged_phase = None
# Color scheme
self.bg_color = "#2E3440"
self.fg_color = "#D8DEE9"
self.accent_color = "#5E81AC"
self.warning_color = "#BF616A"
self.success_color = "#A3BE8C"
# Device and measurement state
self.session_active = False
self.measuring = False
self.test_running = False
self.continuous_mode = False
self.request_stop = False
self.interval = 0.1
self.log_dir = os.path.expanduser("~/adalm1000/logs")
os.makedirs(self.log_dir, exist_ok=True)
# Data buffers
max_data_points = 36000 # Define this first
self.time_data = deque(maxlen=max_data_points)
self.voltage_data = deque(maxlen=max_data_points)
self.current_data = deque(maxlen=max_data_points)
self.max_points_to_keep = 10000
self.display_time_data = deque(maxlen=self.max_points_to_keep)
self.display_voltage_data = deque(maxlen=self.max_points_to_keep)
self.display_current_data = deque(maxlen=self.max_points_to_keep)
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [],
'count': 0, 'last_plot_time': 0
}
self.phase_data = deque()
self.downsample_factor = 1 # Initial kein Downsampling
self.downsample_counter = 0
# Initialize all measurement variables
self.capacity_ah = 0.0
self.energy = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
self.start_time = time.time()
self.last_update_time = self.start_time
# Initialize UI and device
self.setup_ui()
self.init_device()
# Set window properties
self.setWindowTitle("ADALM1000 - Battery Tester (Multi-Mode)")
self.resize(1000, 800)
self.setMinimumSize(800, 700)
# Status update timer
self.status_timer = QTimer()
self.status_timer.timeout.connect(self.update_status_and_plot)
self.status_timer.start(1000) #every second
def get_latest_measurement(self, timeout=0.5):
"""Return latest measurement (voltage, current, timestamp_or_none) for the main UI.
Uses the measurement_thread's measurement_queue if available, else falls back to
self.latest_measurement or measurement containers. Returns (None, None) on timeout.
"""
try:
# Preferred: measurement_thread with queue
if hasattr(self, 'measurement_thread') and getattr(self, 'measurement_thread') is not None:
mq = getattr(self.measurement_thread, 'measurement_queue', None)
if mq is not None:
try:
return mq.get(timeout=timeout)
except Empty:
return (None, None)
# Fallback: attribute latest_measurement
if hasattr(self, 'latest_measurement') and getattr(self, 'latest_measurement') is not None:
return getattr(self, 'latest_measurement')
# Fallback: list-like measurements
for attr in ('measurements', 'measurement_buffer', 'voltage_data', 'current_data'):
if hasattr(self, attr):
container = getattr(self, attr)
try:
if hasattr(container, '__len__') and len(container) > 0:
# If separate voltage/current arrays, return last pair if possible
if attr == 'voltage_data' and hasattr(self, 'current_data') and len(self.current_data) > 0:
return (self.voltage_data[-1], self.current_data[-1])
return container[-1]
except Exception:
pass
except Exception:
# Never raise from the getter - return a safe None tuple
pass
return (None, None)
def setup_ui(self):
"""Configure the user interface"""
# Main widget and layout
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.main_layout = QVBoxLayout(self.central_widget)
self.main_layout.setContentsMargins(10, 10, 10, 10)
# Mode selection
mode_frame = QFrame()
mode_frame.setFrameShape(QFrame.StyledPanel)
mode_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
mode_layout = QHBoxLayout(mode_frame)
self.mode_label = QLabel("Test Mode:")
self.mode_label.setStyleSheet(f"color: {self.fg_color};")
mode_layout.addWidget(self.mode_label)
self.mode_combo = QComboBox()
self.mode_combo.addItems(["Live Monitoring", "Discharge Test", "Charge Test", "Cycle Test"]) # Added Charge Test
self.mode_combo.setStyleSheet(f"""
QComboBox {{
background-color: #3B4252;
color: {self.fg_color};
border: 1px solid #4C566A;
border-radius: 3px;
padding: 2px;
}}
""")
self.mode_combo.currentTextChanged.connect(self.change_mode)
mode_layout.addWidget(self.mode_combo, 1)
self.main_layout.addWidget(mode_frame)
# Header area
header_frame = QFrame()
header_frame.setFrameShape(QFrame.NoFrame)
header_layout = QHBoxLayout(header_frame)
header_layout.setContentsMargins(0, 0, 0, 0)
self.title_label = QLabel("ADALM1000 Battery Tester")
self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};")
header_layout.addWidget(self.title_label, 1)
# Status indicator
self.status_light = QLabel()
self.status_light.setFixedSize(20, 20)
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
header_layout.addWidget(self.status_light)
self.connection_label = QLabel("Disconnected")
header_layout.addWidget(self.connection_label)
# Reconnect button
self.reconnect_btn = QPushButton("Reconnect")
self.reconnect_btn.clicked.connect(self.reconnect_device)
header_layout.addWidget(self.reconnect_btn)
self.main_layout.addWidget(header_frame)
# Measurement display
display_frame = QFrame()
display_frame.setFrameShape(QFrame.StyledPanel)
display_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
display_layout = QGridLayout(display_frame)
# Measurement values - common for all modes
measurement_labels = [
("Voltage", "V"), ("Current", "A"), ("Test Phase", ""),
("Elapsed Time", "s"), ("Capacity", "Ah"), ("Power", "W"),
("Energy", "Wh"), ("Cycle Count", ""), ("Battery Temp", "°C")
]
for i, (label, unit) in enumerate(measurement_labels):
row = i // 3
col = (i % 3) * 3
lbl = QLabel(f"{label}:")
lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
display_layout.addWidget(lbl, row, col)
value_lbl = QLabel("0.000")
value_lbl.setStyleSheet(f"""
color: {self.fg_color};
font-weight: bold;
font-size: 12px;
min-width: 60px;
""")
display_layout.addWidget(value_lbl, row, col + 1)
if unit:
unit_lbl = QLabel(unit)
unit_lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
display_layout.addWidget(unit_lbl, row, col + 2)
for i in range(9):
display_layout.setColumnStretch(i, 1 if i % 3 == 1 else 0)
self.voltage_label = display_layout.itemAtPosition(0, 1).widget()
self.current_label = display_layout.itemAtPosition(0, 4).widget()
self.phase_label = display_layout.itemAtPosition(0, 7).widget()
self.time_label = display_layout.itemAtPosition(1, 1).widget()
self.capacity_label = display_layout.itemAtPosition(1, 4).widget()
self.power_label = display_layout.itemAtPosition(1, 7).widget()
self.energy_label = display_layout.itemAtPosition(2, 1).widget()
self.cycle_label = display_layout.itemAtPosition(2, 4).widget()
self.temp_label = display_layout.itemAtPosition(2, 7).widget()
self.main_layout.addWidget(display_frame)
# Control area
controls_frame = QFrame()
controls_frame.setFrameShape(QFrame.NoFrame)
controls_layout = QHBoxLayout(controls_frame)
controls_layout.setContentsMargins(0, 0, 0, 0)
# Parameters frame
self.params_frame = QFrame()
self.params_frame.setFrameShape(QFrame.StyledPanel)
self.params_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
self.params_layout = QGridLayout(self.params_frame)
# Common parameters
self.capacity = 0.2
self.capacity_label_input = QLabel("Battery Capacity (Ah):")
self.capacity_label_input.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.capacity_label_input, 0, 0)
self.capacity_input = QLineEdit("0.2")
self.capacity_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
self.capacity_input.setFixedWidth(60)
self.params_layout.addWidget(self.capacity_input, 0, 1)
# C-rate for test
self.c_rate = 0.1
self.c_rate_label = QLabel("Test C-rate:")
self.c_rate_label.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.c_rate_label, 1, 0)
self.c_rate_input = QLineEdit("0.1")
self.c_rate_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
self.c_rate_input.setFixedWidth(40)
self.params_layout.addWidget(self.c_rate_input, 1, 1)
c_rate_note = QLabel("(e.g., 0.2 for C/5)")
c_rate_note.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(c_rate_note, 1, 2)
# Discharge cutoff (used in Discharge and Cycle modes)
self.discharge_cutoff = 0.9
self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):")
self.discharge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.discharge_cutoff_label, 2, 0)
self.discharge_cutoff_input = QLineEdit("0.9")
self.discharge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
self.discharge_cutoff_input.setFixedWidth(60)
self.params_layout.addWidget(self.discharge_cutoff_input, 2, 1)
# Charge cutoff (only for Cycle mode)
self.charge_cutoff = 1.43
self.charge_cutoff_label = QLabel("Charge Cutoff (V):")
self.charge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.charge_cutoff_label, 3, 0)
self.charge_cutoff_input = QLineEdit("1.43")
self.charge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
self.charge_cutoff_input.setFixedWidth(60)
self.params_layout.addWidget(self.charge_cutoff_input, 3, 1)
self.charge_cutoff_label.hide()
self.charge_cutoff_input.hide()
# Rest time (only for Cycle mode)
self.rest_time = 0.25
self.rest_time_label = QLabel("Rest Time (hours):")
self.rest_time_label.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.rest_time_label, 4, 0)
self.rest_time_input = QLineEdit("0.25")
self.rest_time_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
self.rest_time_input.setFixedWidth(60)
self.params_layout.addWidget(self.rest_time_input, 4, 1)
self.rest_time_label.hide()
self.rest_time_input.hide()
# Test conditions input
self.test_conditions_label = QLabel("Test Conditions/Chemistry:")
self.test_conditions_label.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.test_conditions_label, 5, 0)
self.test_conditions_input = QLineEdit("")
self.test_conditions_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
self.test_conditions_input.setFixedWidth(120)
self.params_layout.addWidget(self.test_conditions_input, 5, 1)
controls_layout.addWidget(self.params_frame, 1)
# Button frame
button_frame = QFrame()
button_frame.setFrameShape(QFrame.NoFrame)
button_layout = QVBoxLayout(button_frame)
button_layout.setContentsMargins(0, 0, 0, 0)
# Start/Stop buttons
self.start_button = QPushButton("START")
self.start_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.accent_color};
color: {self.fg_color};
font-weight: bold;
padding: 6px;
border-radius: 4px;
}}
QPushButton:disabled {{
background-color: #4C566A;
color: #D8DEE9;
}}
""")
self.start_button.clicked.connect(self.start_test)
button_layout.addWidget(self.start_button)
self.stop_button = QPushButton("STOP")
self.stop_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.warning_color};
color: {self.fg_color};
font-weight: bold;
padding: 6px;
border-radius: 4px;
}}
QPushButton:disabled {{
background-color: #4C566A;
color: #D8DEE9;
}}
""")
self.stop_button.clicked.connect(self.stop_test)
self.stop_button.setEnabled(False)
button_layout.addWidget(self.stop_button)
# Continuous mode checkbox (only for Cycle mode)
self.continuous_mode_check = QCheckBox("Continuous Mode")
self.continuous_mode_check.setChecked(True)
self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")
button_layout.addWidget(self.continuous_mode_check)
self.continuous_mode_check.stateChanged.connect(self.handle_continuous_mode_change)
self.continuous_mode_check.hide()
# Record button for Live mode
self.record_button = QPushButton("Start Recording")
self.record_button.setCheckable(True)
self.record_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.success_color};
color: {self.fg_color};
font-weight: bold;
padding: 6px;
border-radius: 4px;
}}
QPushButton:checked {{
background-color: {self.warning_color};
}}
""")
self.record_button.clicked.connect(self.toggle_recording)
button_layout.addWidget(self.record_button)
self.record_button.hide()
controls_layout.addWidget(button_frame)
self.main_layout.addWidget(controls_frame)
# Plot area
self.setup_plot()
# Status bar
self.status_bar = self.statusBar()
self.status_bar.setStyleSheet(f"color: {self.fg_color};")
self.status_bar.showMessage("Ready")
# Apply dark theme
self.setStyleSheet(f"""
QMainWindow {{
background-color: {self.bg_color};
}}
QLabel {{
color: {self.fg_color};
}}
QLineEdit {{
background-color: #3B4252;
color: {self.fg_color};
border: 1px solid #4C566A;
border-radius: 3px;
padding: 2px;
}}
""")
# Set initial mode
self.current_mode = "Live Monitoring"
self.mode_combo.setCurrentText(self.current_mode)
self.change_mode(self.current_mode) # Initialize UI for live mode
def change_mode(self, mode_name):
"""Change between different test modes"""
self.current_mode = mode_name
self.stop_test() # Stop any current operation
# Hide all optional parameters first
self.charge_cutoff_label.hide()
self.charge_cutoff_input.hide()
self.discharge_cutoff_label.hide()
self.discharge_cutoff_input.hide()
self.rest_time_label.hide()
self.rest_time_input.hide()
self.continuous_mode_check.hide()
self.record_button.hide()
# Show mode-specific parameters
if mode_name == "Cycle Test":
self.charge_cutoff_label.show()
self.charge_cutoff_input.show()
self.discharge_cutoff_label.show()
self.discharge_cutoff_input.show()
self.rest_time_label.show()
self.rest_time_input.show()
self.continuous_mode_check.show()
self.start_button.setText("START CYCLE TEST")
self.start_button.setEnabled(True) # Explicitly enable
elif mode_name == "Discharge Test":
self.discharge_cutoff_label.show()
self.discharge_cutoff_input.show()
self.start_button.setText("START DISCHARGE")
self.start_button.setEnabled(True) # Explicitly enable
elif mode_name == "Charge Test":
self.charge_cutoff_label.show()
self.charge_cutoff_input.show()
self.start_button.setText("START CHARGE")
self.start_button.setEnabled(True) # Explicitly enable
elif mode_name == "Live Monitoring":
self.record_button.show()
self.start_button.setText("START MONITORING")
# Only enable start button if device is connected
self.start_button.setEnabled(self.session_active)
# Reset measurement state
self.reset_test()
self.status_bar.showMessage(f"Mode changed to {mode_name}")
def reset_test(self):
"""Reset test state without stopping measurement"""
# Reset Downsampling
self.downsample_factor = 1
self.downsample_counter = 0
# Clear all data buffers
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
if hasattr(self, 'phase_data'):
self.phase_data.clear()
# Also clear display buffers
if hasattr(self, 'display_time_data'):
self.display_time_data.clear()
self.display_voltage_data.clear()
self.display_current_data.clear()
# Reset aggregation buffer
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [],
'count': 0, 'last_plot_time': 0
}
# Clear measurement thread buffers if it exists
if hasattr(self, 'measurement_thread'):
self.measurement_thread.voltage_window.clear()
self.measurement_thread.current_window.clear()
with self.measurement_thread.measurement_queue.mutex:
self.measurement_thread.measurement_queue.queue.clear()
self.measurement_thread.start_time = time.time()
# Reset capacities and timing
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity_ah = 0.0
self.energy = 0.0
if hasattr(self, 'charge_capacity'):
self.charge_capacity = 0.0
if hasattr(self, 'coulomb_efficiency'):
self.coulomb_efficiency = 0.0
# Reset plot
self.reset_plot()
# Update UI
self.phase_label.setText("Idle")
if hasattr(self, 'test_phase'):
self.test_phase = "Idle"
def toggle_recording(self):
"""Toggle data recording in Live Monitoring mode"""
if self.record_button.isChecked():
# Start recording
try:
# Reset previous data
self.reset_test()
# Reset measurement timing
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
if self.create_cycle_log_file():
self.record_button.setText("Stop Recording")
self.status_bar.showMessage("Live recording started")
# Ensure monitoring is running
if not self.test_running:
self.start_live_monitoring()
else:
self.record_button.setChecked(False)
self.current_cycle_file = None
except Exception as e:
print(f"Error starting recording: {e}")
self.record_button.setChecked(False)
self.current_cycle_file = None
QMessageBox.critical(self, "Error", f"Failed to start recording:\n{str(e)}")
else:
# Stop recording
try:
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
self.finalize_log_file()
self.record_button.setText("Start Recording")
self.status_bar.showMessage("Live recording stopped")
except Exception as e:
print(f"Error stopping recording: {e}")
def handle_continuous_mode_change(self, state):
"""Handle changes to continuous mode checkbox during operation"""
if not state and self.test_running: # If unchecked during test
self.status_bar.showMessage("Continuous mode disabled - will complete current cycle")
self.continuous_mode_check.setStyleSheet(f"color: {self.warning_color};")
QTimer.singleShot(2000, lambda: self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};"))
def setup_plot(self):
"""Configure the matplotlib plot"""
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color)
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
self.ax = self.fig.add_subplot(111)
self.ax.set_facecolor('#3B4252')
# Set initial voltage range
voltage_padding = 0.2
min_voltage = max(0, 0.9 - voltage_padding)
max_voltage = 1.43 + voltage_padding
self.ax.set_ylim(min_voltage, max_voltage)
# Voltage plot
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
# Current plot (right axis)
self.ax2 = self.ax.twinx()
current_padding = 0.05
test_current = 0.1 * 0.2 # Default values
max_current = test_current * 1.5
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
self.ax2.set_ylabel("Current (A)", color='r')
self.ax2.tick_params(axis='y', labelcolor='r')
self.ax.set_xlabel('Time (s)', color=self.fg_color)
self.ax.set_title('Battery Test', color=self.fg_color)
self.ax.tick_params(axis='x', colors=self.fg_color)
self.ax.grid(True, color='#4C566A')
# Position legends
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
# Embed plot
self.canvas = FigureCanvas(self.fig)
self.canvas.setStyleSheet(f"background-color: {self.bg_color};")
self.main_layout.addWidget(self.canvas, 1)
def init_device(self):
"""Initialize the ADALM1000 device with continuous measurement"""
try:
# Clean up any existing session
if hasattr(self, 'session'):
try:
self.session.end()
del self.session
except:
pass
time.sleep(1)
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
if not self.session.devices:
raise Exception("No ADALM1000 detected - check connections")
self.dev = self.session.devices[0]
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
self.dev.channels['B'].constant(0)
self.session.start(0)
self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;")
self.connection_label.setText("Connected")
self.status_bar.showMessage("Device connected | Ready to measure")
self.session_active = True
self.start_button.setEnabled(True)
# Start measurement thread
self.measurement_thread = MeasurementThread(self.dev, self.interval)
self.measurement_thread.update_signal.connect(self.update_measurements)
self.measurement_thread.error_signal.connect(self.handle_device_error)
self.measurement_thread.start()
except Exception as e:
self.handle_device_error(str(e))
@pyqtSlot(float, float, float)
def update_measurements(self, voltage, current, current_time):
try:
# Only store data if in a test or recording
if not (self.test_running or self.record_button.isChecked()):
return
# 1. Originale Daten immer vollständig speichern (für Berechnungen und Logging)
with self.plot_mutex:
self.time_data.append(current_time)
self.voltage_data.append(voltage)
self.current_data.append(current)
# 2. Downsampling für die Anzeige
if not hasattr(self, 'aggregation_buffer'):
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [],
'count': 0, 'last_plot_time': 0
}
self.aggregation_buffer['time'].append(current_time)
self.aggregation_buffer['voltage'].append(voltage)
self.aggregation_buffer['current'].append(current)
self.aggregation_buffer['count'] += 1
# Nur aggregieren wenn genug Daten oder Zeit vergangen
now = time.time()
if (self.aggregation_buffer['count'] >= self.downsample_factor or
now - self.aggregation_buffer['last_plot_time'] >= 1.0):
# Berechne aggregierte Werte (Mittelwert)
agg_time = np.mean(self.aggregation_buffer['time'])
agg_voltage = np.mean(self.aggregation_buffer['voltage'])
agg_current = np.mean(self.aggregation_buffer['current'])
# Für die Anzeige verwenden
if not hasattr(self, 'display_time_data'):
self.display_time_data = deque(maxlen=self.max_points_to_keep)
self.display_voltage_data = deque(maxlen=self.max_points_to_keep)
self.display_current_data = deque(maxlen=self.max_points_to_keep)
self.display_time_data.append(agg_time)
self.display_voltage_data.append(agg_voltage)
self.display_current_data.append(agg_current)
# Reset Buffer
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [],
'count': 0, 'last_plot_time': now
}
# 3. Originale Funktionalität für Berechnungen beibehalten
self.voltage_label.setText(f"{voltage:.4f}")
self.current_label.setText(f"{abs(current):.4f}")
self.time_label.setText(self.format_time(current_time))
# Calculate and display power and energy
power = voltage * abs(current)
self.power_label.setText(f"{power:.4f}")
if len(self.time_data) > 1:
delta_t = self.time_data[-1] - self.time_data[-2]
self.energy += power * delta_t / 3600 # Convert to Wh
self.energy_label.setText(f"{self.energy:.4f}")
# 4. Auto-Skalierung anpassen
if len(self.time_data) > self.max_points_to_keep * 1.5:
self.adjust_downsampling()
# 5. Plot updates throttled to 10Hz
if not hasattr(self, '_last_plot_update'):
self._last_plot_update = 0
if now - self._last_plot_update >= 0.1:
self._last_plot_update = now
QTimer.singleShot(0, self.update_plot)
except Exception as e:
print(f"Error in update_measurements: {e}")
import traceback
traceback.print_exc()
# Versuche den Aggregationsbuffer zu retten
if hasattr(self, 'aggregation_buffer'):
agg_buffer = self.aggregation_buffer
if agg_buffer['count'] > 0:
try:
with self.plot_mutex:
if not hasattr(self, 'display_time_data'):
self.display_time_data = deque(maxlen=self.max_points_to_keep)
self.display_voltage_data = deque(maxlen=self.max_points_to_keep)
self.display_current_data = deque(maxlen=self.max_points_to_keep)
self.display_time_data.append(np.mean(agg_buffer['time']))
self.display_voltage_data.append(np.mean(agg_buffer['voltage']))
self.display_current_data.append(np.mean(agg_buffer['current']))
except:
pass
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [],
'count': 0, 'last_plot_time': time.time()
}
def adjust_downsampling(self):
current_length = len(self.time_data)
if current_length > self.max_points_to_keep * 1.5:
# Exponentiell erhöhen, aber max. 64
new_factor = min(64, max(1, self.downsample_factor * 2))
elif current_length < self.max_points_to_keep // 2:
# Halbieren, aber min. 1
new_factor = max(1, self.downsample_factor // 2)
else:
return
if new_factor != self.downsample_factor:
self.downsample_factor = new_factor
self.status_bar.showMessage(
f"Downsampling: Factor {self.downsample_factor}", 2000)
def update_status_and_plot(self):
"""Combined status and plot update"""
self.update_status()
self.update_plot()
def update_status(self):
"""Update status information periodically"""
now = time.time() # Define 'now' at the start of the method
if self.test_running or hasattr(self, 'record_button') and self.record_button.isChecked():
if self.time_data:
current_time = self.time_data[-1]
if len(self.time_data) > 1:
delta_t = self.time_data[-1] - self.time_data[-2]
if delta_t > 0:
current_current = abs(self.current_data[-1])
self.capacity_ah += current_current * delta_t / 3600
self.capacity_label.setText(f"{self.capacity_ah:.4f}")
# Logging (1x per second)
if (hasattr(self, 'log_writer') and
hasattr(self, 'current_cycle_file') and
self.current_cycle_file is not None and
not self.current_cycle_file.closed):
# Initialize last log time if not exists
if not hasattr(self, '_last_log_time'):
self._last_log_time = now
if self.time_data and (now - self._last_log_time >= 1.0):
try:
current_time = self.time_data[-1]
voltage = self.voltage_data[-1]
current = self.current_data[-1]
if self.current_mode == "Cycle Test":
self.log_writer.writerow([
f"{current_time:.4f}",
f"{voltage:.6f}",
f"{current:.6f}",
self.test_phase,
f"{self.capacity_ah:.4f}",
f"{self.charge_capacity:.4f}",
f"{self.coulomb_efficiency:.1f}",
f"{self.cycle_count}"
])
else:
self.log_writer.writerow([
f"{current_time:.4f}",
f"{voltage:.6f}",
f"{current:.6f}",
self.test_phase if hasattr(self, 'test_phase') else "Live",
f"{self.capacity_ah:.4f}",
f"{voltage * current:.4f}", # Power
f"{self.energy:.4f}", # Energy
f"{self.cycle_count}" if hasattr(self, 'cycle_count') else "1"
])
self.current_cycle_file.flush()
self._last_log_time = now
except Exception as e:
print(f"Error writing to log file: {e}")
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
try:
self.current_cycle_file.close()
except:
pass
self.record_button.setChecked(False)
self.current_cycle_file = None
def start_test(self):
"""Start the selected test mode"""
if self.current_mode == "Cycle Test":
self.start_cycle_test()
elif self.current_mode == "Discharge Test":
self.start_discharge_test()
elif self.current_mode == "Charge Test":
self.start_charge_test()
elif self.current_mode == "Live Monitoring":
self.start_live_monitoring()
def start_cycle_test(self):
"""Start the battery cycle test"""
# Clean up any previous test
if hasattr(self, 'test_sequence_thread'):
try:
th = getattr(self, 'test_sequence_thread', None)
# Only operate on thread if it still exists and wasn't deleted by SIP
if th is not None and not sip.isdeleted(th):
try:
if th.isRunning():
wk = getattr(self, 'test_sequence_worker', None)
if wk is not None and not sip.isdeleted(wk):
try:
wk.stop()
except Exception:
pass
try:
th.quit()
except Exception:
pass
try:
th.wait(500)
except Exception:
pass
except RuntimeError:
# Thread object may have been deleted concurrently
pass
except Exception:
pass
finally:
# Safely schedule deletion of worker and thread, if still valid
try:
wk = getattr(self, 'test_sequence_worker', None)
if wk is not None and not sip.isdeleted(wk):
try:
wk.deleteLater()
except Exception:
pass
except Exception:
pass
try:
th = getattr(self, 'test_sequence_thread', None)
if th is not None and not sip.isdeleted(th):
try:
th.deleteLater()
except Exception:
pass
except Exception:
pass
# Remove references to help GC
try:
delattr = delattr
if hasattr(self, 'test_sequence_thread'):
try:
delattr(self, 'test_sequence_thread')
except Exception:
try:
setattr(self, 'test_sequence_thread', None)
except Exception:
pass
except Exception:
pass
# Reset stop flag
self.request_stop = False
if not self.test_running:
try:
# Get parameters from UI
self.capacity = float(self.capacity_input.text())
self.charge_cutoff = float(self.charge_cutoff_input.text())
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
self.rest_time = float(self.rest_time_input.text())
self.c_rate = float(self.c_rate_input.text())
# Validate inputs
if self.capacity <= 0:
raise ValueError("Battery capacity must be positive")
if self.charge_cutoff <= self.discharge_cutoff:
raise ValueError("Charge cutoff must be higher than discharge cutoff")
if self.c_rate <= 0:
raise ValueError("C-rate must be positive")
test_current = self.c_rate * self.capacity
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear ALL previous data completely
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
self.phase_data.clear()
# Reset capacities and timing
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity_ah = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
self.energy = 0.0
# Reset measurement thread's timer and queues
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
self.measurement_thread.voltage_window.clear()
self.measurement_thread.current_window.clear()
with self.measurement_thread.measurement_queue.mutex:
self.measurement_thread.measurement_queue.queue.clear()
# Reset plot completely
self.reset_plot()
# Start test
self.test_running = True
self.start_time = time.time()
self.last_update_time = time.time()
self.test_phase = "Initial Discharge"
self.phase_label.setText(self.test_phase)
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.status_bar.showMessage(f"Cycle test started | Current: {test_current:.4f}A")
# Create log file
self.create_cycle_log_file()
# Start test sequence in a QThread
self.test_sequence_thread = QThread()
self.test_sequence_worker = TestSequenceWorker(
self.dev,
test_current,
self.charge_cutoff,
self.discharge_cutoff,
self.rest_time,
self.continuous_mode_check.isChecked(),
self # Pass reference to main window for callbacks
)
self.test_sequence_worker.moveToThread(self.test_sequence_thread)
# Connect signals
self.test_sequence_worker.update_phase.connect(self.update_test_phase)
self.test_sequence_worker.update_status.connect(self.status_bar.showMessage)
self.test_sequence_worker.test_completed.connect(self.finalize_test)
self.test_sequence_worker.error_occurred.connect(self.handle_test_error)
self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit)
self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater)
self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater)
# Start the thread and the worker's run method
self.test_sequence_thread.start()
QTimer.singleShot(0, self.test_sequence_worker.run)
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
# Ensure buttons are in correct state if error occurs
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
def start_discharge_test(self):
"""Start the battery discharge test"""
# Clean up any previous test
if hasattr(self, 'discharge_thread'):
try:
th = getattr(self, 'discharge_thread', None)
# Only operate on thread if it still exists and wasn't deleted by SIP
if th is not None and not sip.isdeleted(th):
try:
if th.isRunning():
wk = getattr(self, 'discharge_worker', None)
if wk is not None and not sip.isdeleted(wk):
try:
wk.stop()
except Exception:
pass
try:
th.quit()
except Exception:
pass
try:
th.wait(500)
except Exception:
pass
except RuntimeError:
# Thread object may have been deleted concurrently
pass
except Exception:
pass
finally:
# Safely schedule deletion of worker and thread, if still valid
try:
wk = getattr(self, 'discharge_worker', None)
if wk is not None and not sip.isdeleted(wk):
try:
wk.deleteLater()
except Exception:
pass
except Exception:
pass
try:
th = getattr(self, 'discharge_thread', None)
if th is not None and not sip.isdeleted(th):
try:
th.deleteLater()
except Exception:
pass
except Exception:
pass
# Remove references to help GC
try:
delattr = delattr
if hasattr(self, 'discharge_thread'):
try:
delattr(self, 'discharge_thread')
except Exception:
try:
setattr(self, 'discharge_thread', None)
except Exception:
pass
except Exception:
pass
# Reset stop flag
self.request_stop = False
if not self.test_running:
try:
# Get parameters from UI
self.capacity = float(self.capacity_input.text())
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
self.c_rate = float(self.c_rate_input.text())
# Validate inputs
if self.capacity <= 0:
raise ValueError("Battery capacity must be positive")
if self.c_rate <= 0:
raise ValueError("C-rate must be positive")
test_current = self.c_rate * self.capacity
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear ALL previous data completely
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
# Reset capacities and timing
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity_ah = 0.0
self.energy = 0.0
self.cycle_count = 1
# Reset measurement thread's timer and queues
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
self.measurement_thread.voltage_window.clear()
self.measurement_thread.current_window.clear()
with self.measurement_thread.measurement_queue.mutex:
self.measurement_thread.measurement_queue.queue.clear()
# Reset plot completely
self.reset_plot()
# Start test
self.test_running = True
self.start_time = time.time()
self.last_update_time = time.time()
self.test_phase = "Discharge"
self.phase_label.setText(self.test_phase)
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.status_bar.showMessage(f"Discharge started | Current: {test_current:.4f}A")
# Create log file
self.create_cycle_log_file()
# Start discharge worker in a QThread
self.discharge_thread = QThread()
self.discharge_worker = DischargeWorker(
self.dev,
test_current,
self.discharge_cutoff,
self # Pass reference to main window for callbacks
)
self.discharge_worker.moveToThread(self.discharge_thread)
# Connect signals
self.discharge_worker.update_status.connect(self.status_bar.showMessage)
self.discharge_worker.test_completed.connect(self.finalize_test)
self.discharge_worker.error_occurred.connect(self.handle_test_error)
self.discharge_worker.finished.connect(self.discharge_thread.quit)
self.discharge_worker.finished.connect(self.discharge_worker.deleteLater)
self.discharge_thread.finished.connect(self.discharge_thread.deleteLater)
# Start the thread and the worker's run method
self.discharge_thread.start()
QTimer.singleShot(0, self.discharge_worker.run)
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
# Ensure buttons are in correct state if error occurs
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
def start_charge_test(self):
"""Start the battery charge test"""
# Clean up any previous test
if hasattr(self, 'charge_thread'):
try:
th = getattr(self, 'charge_thread', None)
# Only operate on thread if it still exists and wasn't deleted by SIP
if th is not None and not sip.isdeleted(th):
try:
if th.isRunning():
wk = getattr(self, 'charge_worker', None)
if wk is not None and not sip.isdeleted(wk):
try:
wk.stop()
except Exception:
pass
try:
th.quit()
except Exception:
pass
try:
th.wait(500)
except Exception:
pass
except RuntimeError:
# Thread object may have been deleted concurrently
pass
except Exception:
pass
finally:
# Safely schedule deletion of worker and thread, if still valid
try:
wk = getattr(self, 'charge_worker', None)
if wk is not None and not sip.isdeleted(wk):
try:
wk.deleteLater()
except Exception:
pass
except Exception:
pass
try:
th = getattr(self, 'charge_thread', None)
if th is not None and not sip.isdeleted(th):
try:
th.deleteLater()
except Exception:
pass
except Exception:
pass
# Remove references to help GC
try:
delattr = delattr
if hasattr(self, 'charge_thread'):
try:
delattr(self, 'charge_thread')
except Exception:
try:
setattr(self, 'charge_thread', None)
except Exception:
pass
except Exception:
pass
# Reset stop flag
self.request_stop = False
if not self.test_running:
try:
# Get parameters from UI
self.capacity = float(self.capacity_input.text())
self.charge_cutoff = float(self.charge_cutoff_input.text())
self.c_rate = float(self.c_rate_input.text())
# Validate inputs
if self.capacity <= 0:
raise ValueError("Battery capacity must be positive")
if self.c_rate <= 0:
raise ValueError("C-rate must be positive")
test_current = self.c_rate * self.capacity
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear ALL previous data completely
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
# Reset capacities and timing
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity_ah = 0.0
self.energy = 0.0
self.cycle_count = 1
# Reset measurement thread
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
self.measurement_thread.voltage_window.clear()
self.measurement_thread.current_window.clear()
with self.measurement_thread.measurement_queue.mutex:
self.measurement_thread.measurement_queue.queue.clear()
# Reset plot
self.reset_plot()
# Start test
self.test_running = True
self.start_time = time.time()
self.last_update_time = time.time()
self.test_phase = "Charge"
self.phase_label.setText(self.test_phase)
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.status_bar.showMessage(f"Charge started @ {test_current:.3f}A to {self.charge_cutoff}V")
# Create log file
self.create_cycle_log_file()
# Start charge worker in a QThread
self.charge_thread = QThread()
self.charge_worker = ChargeWorker(
self.dev,
test_current,
self.charge_cutoff,
self
)
self.charge_worker.moveToThread(self.charge_thread)
# Connect signals
self.charge_worker.update_status.connect(self.status_bar.showMessage)
self.charge_worker.test_completed.connect(self.finalize_test)
self.charge_worker.error_occurred.connect(self.handle_test_error)
self.charge_worker.finished.connect(self.charge_thread.quit)
self.charge_worker.finished.connect(self.charge_worker.deleteLater)
self.charge_thread.finished.connect(self.charge_thread.deleteLater)
# Start the thread
self.charge_thread.start()
QTimer.singleShot(0, self.charge_worker.run)
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
def start_live_monitoring(self):
"""Start live monitoring mode"""
try:
# Reset everything completely
self.reset_test()
# Reset measurement timing
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
# Set monitoring flags
self.test_running = True
self.test_phase = "Live Monitoring"
self.phase_label.setText(self.test_phase)
# Update UI
self.stop_button.setEnabled(True)
self.start_button.setEnabled(False)
# Configure device for monitoring
if hasattr(self, 'dev'):
try:
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
except Exception as e:
print(f"Error configuring device for monitoring: {e}")
self.status_bar.showMessage("Live monitoring started")
except Exception as e:
print(f"Error starting live monitoring: {e}")
self.test_running = False
QMessageBox.critical(self, "Error", f"Failed to start monitoring:\n{str(e)}")
def create_cycle_log_file(self):
"""Create a new log file for the current test"""
try:
self._last_log_time = time.time()
# Close previous file if exists
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
try:
self.current_cycle_file.close()
except Exception as e:
print(f"Error closing previous log file: {e}")
# Ensure log directory exists
os.makedirs(self.log_dir, exist_ok=True)
if not os.access(self.log_dir, os.W_OK):
QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}")
return False
# Generate filename based on mode
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if self.current_mode == "Cycle Test":
self.filename = os.path.join(self.log_dir, f"battery_cycle_{timestamp}.csv")
elif self.current_mode == "Discharge Test":
self.filename = os.path.join(self.log_dir, f"battery_discharge_{timestamp}.csv")
else: # Live Monitoring
self.filename = os.path.join(self.log_dir, f"battery_live_{timestamp}.csv")
# Open new file
try:
self.current_cycle_file = open(self.filename, 'w', newline='')
# Write header with test parameters
test_current = self.c_rate * self.capacity
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
self.current_cycle_file.write(f"# ADALM1000 Battery Test Log - {self.current_mode}\n")
self.current_cycle_file.write(f"# Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
self.current_cycle_file.write(f"# Battery Capacity: {self.capacity} Ah\n")
if self.current_mode != "Live Monitoring":
self.current_cycle_file.write(f"# Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n")
if self.current_mode == "Cycle Test":
self.current_cycle_file.write(f"# Charge Cutoff: {self.charge_cutoff} V\n")
self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n")
self.current_cycle_file.write(f"# Rest Time: {self.rest_time} hours\n")
elif self.current_mode == "Discharge Test":
self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n")
self.current_cycle_file.write(f"# Test Conditions/Chemistry: {test_conditions}\n")
self.current_cycle_file.write("#\n")
# Write data header
self.log_writer = csv.writer(self.current_cycle_file)
if self.current_mode == "Cycle Test":
self.log_writer.writerow([
"Time(s)", "Voltage(V)", "Current(A)", "Phase",
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
"Coulomb_Eff(%)", "Cycle"
])
else:
self.log_writer.writerow([
"Time(s)", "Voltage(V)", "Current(A)", "Phase",
"Capacity(Ah)", "Power(W)", "Energy(Wh)", "Cycle"
])
return True
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to create log file: {e}")
return False
except Exception as e:
print(f"Error in create_cycle_log_file: {e}")
return False
def finalize_log_file(self):
"""Finalize the current log file"""
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
try:
test_current = self.c_rate * self.capacity
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
self.current_cycle_file.write("\n# TEST SUMMARY\n")
self.current_cycle_file.write(f"# Test Parameters:\n")
self.current_cycle_file.write(f"# - Battery Capacity: {self.capacity} Ah\n")
if self.current_mode != "Live Monitoring":
self.current_cycle_file.write(f"# - Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n")
if self.current_mode == "Cycle Test":
self.current_cycle_file.write(f"# - Charge Cutoff: {self.charge_cutoff} V\n")
self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n")
self.current_cycle_file.write(f"# - Rest Time: {self.rest_time} hours\n")
elif self.current_mode == "Discharge Test":
self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n")
self.current_cycle_file.write(f"# - Test Conditions: {test_conditions}\n")
self.current_cycle_file.write(f"# Results:\n")
if self.current_mode == "Cycle Test":
self.current_cycle_file.write(f"# - Cycles Completed: {self.cycle_count}\n")
self.current_cycle_file.write(f"# - Final Discharge Capacity: {self.capacity_ah:.4f} Ah\n")
self.current_cycle_file.write(f"# - Final Charge Capacity: {self.charge_capacity:.4f} Ah\n")
self.current_cycle_file.write(f"# - Coulombic Efficiency: {self.coulomb_efficiency:.1f}%\n")
else:
self.current_cycle_file.write(f"# - Capacity: {self.capacity_ah:.4f} Ah\n")
self.current_cycle_file.write(f"# - Energy: {self.energy:.4f} Wh\n")
self.current_cycle_file.close()
except Exception as e:
print(f"Error closing log file: {e}")
finally:
self.current_cycle_file = None
def format_time(self, seconds):
"""Convert seconds to hh:mm:ss format"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def stop_test(self):
"""Request immediate stop of the current test or monitoring"""
if not self.test_running and not (hasattr(self, 'record_button') and self.record_button.isChecked()):
return
self.request_stop = True
self.test_running = False
self.measuring = False
# Stop any active test threads
if hasattr(self, 'test_sequence_worker'):
try:
if not sip.isdeleted(self.test_sequence_worker):
self.test_sequence_worker.stop()
except:
pass
if hasattr(self, 'discharge_worker'):
try:
if not sip.isdeleted(self.discharge_worker):
self.discharge_worker.stop()
except:
pass
# Stop recording if active
if hasattr(self, 'record_button') and self.record_button.isChecked():
self.record_button.setChecked(False)
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
self.finalize_log_file()
self.record_button.setText("Start Recording")
# Reset device to safe state
if hasattr(self, 'dev'):
try:
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
except Exception as e:
print(f"Error resetting device: {e}")
# Clear all data buffers
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
if hasattr(self, 'phase_data'):
self.phase_data.clear()
# Reset measurements
self.capacity_ah = 0.0
self.energy = 0.0
if hasattr(self, 'charge_capacity'):
self.charge_capacity = 0.0
if hasattr(self, 'coulomb_efficiency'):
self.coulomb_efficiency = 0.0
# Reset plot
self.reset_plot()
# Update UI
self.test_phase = "Idle"
self.phase_label.setText(self.test_phase)
self.stop_button.setEnabled(False)
self.start_button.setEnabled(True)
if self.current_mode == "Live Monitoring":
self.status_bar.showMessage("Live monitoring stopped")
else:
self.status_bar.showMessage("Test stopped - Ready for new test")
def finalize_test(self):
"""Final cleanup after test completes or is stopped"""
try:
# 1. Stop any active measurement or test operations
self.measuring = False
self.test_running = False
# 2. Reset device to safe state
if hasattr(self, 'dev'):
try:
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
except Exception as e:
print(f"Error resetting device in finalize: {e}")
# 3. Clean up test sequence thread safely
if hasattr(self, 'test_sequence_thread'):
try:
if self.test_sequence_thread.isRunning():
if hasattr(self, 'test_sequence_worker'):
try:
self.test_sequence_worker.stop()
except RuntimeError:
pass
self.test_sequence_thread.quit()
self.test_sequence_thread.wait(500)
except Exception as e:
print(f"Error stopping test sequence thread: {e}")
finally:
if hasattr(self, 'test_sequence_worker'):
try:
if not sip.isdeleted(self.test_sequence_worker):
self.test_sequence_worker.deleteLater()
except:
pass
if hasattr(self, 'test_sequence_thread'):
try:
if not sip.isdeleted(self.test_sequence_thread):
self.test_sequence_thread.deleteLater()
except:
pass
finally:
if hasattr(self, 'test_sequence_thread'):
del self.test_sequence_thread
# 4. Clean up discharge thread safely
if hasattr(self, 'discharge_thread'):
try:
if self.discharge_thread.isRunning():
if hasattr(self, 'discharge_worker'):
try:
self.discharge_worker.stop()
except RuntimeError:
pass
self.discharge_thread.quit()
self.discharge_thread.wait(500)
except Exception as e:
print(f"Error stopping discharge thread: {e}")
finally:
if hasattr(self, 'discharge_worker'):
try:
if not sip.isdeleted(self.discharge_worker):
self.discharge_worker.deleteLater()
except:
pass
if hasattr(self, 'discharge_thread'):
try:
if not sip.isdeleted(self.discharge_thread):
self.discharge_thread.deleteLater()
except:
pass
finally:
if hasattr(self, 'discharge_thread'):
del self.discharge_thread
# 5. Clean up charge thread safely (using same pattern as discharge thread)
if hasattr(self, 'charge_thread'):
try:
if self.charge_thread.isRunning():
if hasattr(self, 'charge_worker'):
try:
self.charge_worker.stop()
except RuntimeError:
pass
self.charge_thread.quit()
self.charge_thread.wait(500)
except Exception as e:
print(f"Error stopping charge thread: {e}")
finally:
if hasattr(self, 'charge_worker'):
try:
if not sip.isdeleted(self.charge_worker):
self.charge_worker.deleteLater()
except:
pass
if hasattr(self, 'charge_thread'):
try:
if not sip.isdeleted(self.charge_thread):
self.charge_thread.deleteLater()
except:
pass
finally:
if hasattr(self, 'charge_thread'):
del self.charge_thread
# 6. Finalize log file
self.finalize_log_file()
# 7. Reset UI and state
self.request_stop = False
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
# 8. Show completion message if test wasn't stopped by user
if not self.request_stop:
test_current = self.c_rate * self.capacity
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
if self.current_mode == "Cycle Test":
message = (
f"Cycle test completed | "
f"Cycle {self.cycle_count} | "
f"Capacity: {self.capacity_ah:.4f}Ah | "
f"Efficiency: {self.coulomb_efficiency:.1f}%"
)
QMessageBox.information(
self,
"Test Completed",
f"Cycle test completed successfully.\n\n"
f"Test Parameters:\n"
f"- Capacity: {self.capacity} Ah\n"
f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n"
f"- Charge Cutoff: {self.charge_cutoff} V\n"
f"- Discharge Cutoff: {self.discharge_cutoff} V\n"
f"- Conditions: {test_conditions}\n\n"
f"Results:\n"
f"- Cycles: {self.cycle_count}\n"
f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n"
f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%"
)
elif self.current_mode == "Discharge Test":
message = (
f"Discharge completed | "
f"Capacity: {self.capacity_ah:.4f}Ah | "
f"Energy: {self.energy:.4f}Wh"
)
QMessageBox.information(
self,
"Discharge Completed",
f"Discharge test completed successfully.\n\n"
f"Test Parameters:\n"
f"- Capacity: {self.capacity} Ah\n"
f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n"
f"- Discharge Cutoff: {self.discharge_cutoff} V\n"
f"- Conditions: {test_conditions}\n\n"
f"Results:\n"
f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n"
f"- Energy delivered: {self.energy:.4f}Wh"
)
self.status_bar.showMessage(message)
except Exception as e:
print(f"Error in finalize_test: {e}")
import traceback
traceback.print_exc()
# Ensure we don't leave the UI in a locked state
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.status_bar.showMessage("Error during test finalization")
def reset_plot(self):
"""Completely reset the plot - clears all data and visuals"""
# Clear line data
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
# Reset axes with appropriate ranges
voltage_padding = 0.2
min_voltage = 0
max_voltage = 5.0 # Max voltage for ADALM1000
self.ax.set_xlim(0, 10) # Reset X axis
self.ax.set_ylim(min_voltage, max_voltage)
self.ax.set_xlabel('Time (s)', color=self.fg_color)
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
self.ax.set_title('Battery Test', color=self.fg_color)
self.ax.tick_params(axis='x', colors=self.fg_color)
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
self.ax.grid(True, color='#4C566A')
# Reset twin axis (current)
current_padding = 0.05
self.ax2.set_xlim(0, 10)
self.ax2.set_ylim(-0.25 - current_padding, 0.25 + current_padding)
self.ax2.set_ylabel("Current (A)", color='r')
self.ax2.tick_params(axis='y', labelcolor='r')
# Redraw legends
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
# Force immediate redraw
self.canvas.draw()
def update_status_and_plot(self):
"""Combined status and plot update"""
self.update_status()
self.update_plot()
def update_plot(self):
"""More robust plotting with error handling"""
try:
# Create local copies of data safely
with self.plot_mutex:
if not self.display_time_data:
return
x_data = np.array(self.display_time_data)
y1_data = np.array(self.display_voltage_data)
y2_data = np.array(self.display_current_data)
# Update plot data
self.line_voltage.set_data(x_data, y1_data)
self.line_current.set_data(x_data, y2_data)
# Auto-scale when needed
if len(x_data) > 1:
self.auto_scale_axes()
# Force redraw
self.canvas.draw_idle()
except Exception as e:
print(f"Plot error: {e}")
# Attempt to recover
self.reset_plot()
def auto_scale_axes(self):
"""Auto-scale plot axes with appropriate padding and strict boundaries"""
if not self.time_data:
return
min_time = 0
max_time = self.time_data[-1]
current_xlim = self.ax.get_xlim()
if max_time > current_xlim[1] * 0.95:
new_max = max_time * 1.05
self.ax.set_xlim(min_time, new_max)
self.ax2.set_xlim(min_time, new_max)
voltage_padding = 0.2
if self.voltage_data:
min_voltage = max(0, min(self.voltage_data) - voltage_padding)
max_voltage = min(5.0, max(self.voltage_data) + voltage_padding)
current_ylim = self.ax.get_ylim()
if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1):
self.ax.set_ylim(min_voltage, max_voltage)
current_padding = 0.05
if self.current_data:
min_current = max(-0.25, min(self.current_data) - current_padding)
max_current = min(0.25, max(self.current_data) + current_padding)
current_ylim2 = self.ax2.get_ylim()
if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02):
self.ax2.set_ylim(min_current, max_current)
@pyqtSlot(str)
def handle_device_error(self, error):
"""Handle device connection errors"""
error_msg = str(error)
print(f"Device error: {error_msg}")
if hasattr(self, 'session'):
try:
if self.session_active:
self.session.end()
del self.session
except Exception as e:
print(f"Error cleaning up session: {e}")
self.status_light.setStyleSheet(f"background-color: red; border-radius: 10px;")
self.connection_label.setText("Disconnected")
self.status_bar.showMessage(f"Device error: {error_msg}")
self.session_active = False
self.test_running = False
self.continuous_mode = False
self.measuring = False
self.start_button.setEnabled(False)
self.stop_button.setEnabled(False)
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
@pyqtSlot(str)
def update_test_phase(self, phase_text):
"""Update the test phase display"""
self.test_phase = phase_text
self.phase_label.setText(phase_text)
@pyqtSlot(str)
def handle_test_error(self, error_msg):
"""Handle errors from the test sequence with complete cleanup"""
try:
# 1. Notify user
QMessageBox.critical(self, "Test Error",
f"An error occurred:\n{error_msg}\n\nAttempting to recover...")
# 2. Stop all operations
self.stop_test()
# 3. Reset UI elements
if hasattr(self, 'line_voltage'):
try:
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
self.ax.set_xlim(0, 1)
self.ax2.set_xlim(0, 1)
self.canvas.draw()
except Exception as plot_error:
print(f"Plot reset error: {plot_error}")
# 4. Update status
self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...")
self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;")
# 5. Attempt recovery
QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect
except Exception as e:
print(f"Error in error handler: {e}")
# Fallback - restart application?
QMessageBox.critical(self, "Fatal Error",
"The application needs to restart due to an unrecoverable error")
QTimer.singleShot(1000, self.close)
def attempt_reconnect(self):
"""Attempt to reconnect automatically"""
QMessageBox.critical(
self,
"Device Connection Error",
"Could not connect to ADALM1000\n\n"
"1. Check USB cable connection\n"
"2. The device will attempt to reconnect automatically"
)
QTimer.singleShot(1000, self.reconnect_device)
def reconnect_device(self):
"""Reconnect the device with proper cleanup"""
self.status_bar.showMessage("Attempting to reconnect...")
if hasattr(self, 'session'):
try:
if self.session_active:
self.session.end()
del self.session
except:
pass
self.test_running = False
self.continuous_mode = False
self.measuring = False
if hasattr(self, 'measurement_thread'):
self.measurement_thread.stop()
time.sleep(1.5)
try:
self.init_device()
if self.session_active:
self.status_bar.showMessage("Reconnected successfully")
return
except Exception as e:
print(f"Reconnect failed: {e}")
self.status_bar.showMessage("Reconnect failed - will retry...")
QTimer.singleShot(2000, self.reconnect_device)
def closeEvent(self, event):
"""Clean up on window close"""
self.test_running = False
self.measuring = False
self.session_active = False
# Stop measurement thread
if hasattr(self, 'measurement_thread'):
self.measurement_thread.stop()
# Stop test sequence thread
if hasattr(self, 'test_sequence_thread'):
try:
th = getattr(self, 'test_sequence_thread', None)
# Only operate on thread if it still exists and wasn't deleted by SIP
if th is not None and not sip.isdeleted(th):
try:
if th.isRunning():
wk = getattr(self, 'test_sequence_worker', None)
if wk is not None and not sip.isdeleted(wk):
try:
wk.stop()
except Exception:
pass
try:
th.quit()
except Exception:
pass
try:
th.wait(500)
except Exception:
pass
except RuntimeError:
# Thread object may have been deleted concurrently
pass
except Exception:
pass
finally:
# Safely schedule deletion of worker and thread, if still valid
try:
wk = getattr(self, 'test_sequence_worker', None)
if wk is not None and not sip.isdeleted(wk):
try:
wk.deleteLater()
except Exception:
pass
except Exception:
pass
try:
th = getattr(self, 'test_sequence_thread', None)
if th is not None and not sip.isdeleted(th):
try:
th.deleteLater()
except Exception:
pass
except Exception:
pass
# Remove references to help GC
try:
delattr = delattr
if hasattr(self, 'test_sequence_thread'):
try:
delattr(self, 'test_sequence_thread')
except Exception:
try:
setattr(self, 'test_sequence_thread', None)
except Exception:
pass
except Exception:
pass
# Stop discharge thread
if hasattr(self, 'discharge_thread'):
if hasattr(self, 'discharge_worker'):
self.discharge_worker.stop()
self.discharge_thread.quit()
self.discharge_thread.wait(500)
# Clean up device session
if hasattr(self, 'session') and self.session:
try:
self.session.end()
except Exception as e:
print(f"Error ending session: {e}")
event.accept()
if __name__ == "__main__":
app = QApplication([])
try:
window = BatteryTester()
window.show()
app.exec_()
except Exception as e:
QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}")