Diese Änderungen fügen:
Ein neues Eingabefeld für Testbedingungen/Zellchemie hinzu
Schreiben alle relevanten Testparameter in den Kopf der Logdatei
Fügen eine detaillierte Zusammenfassung am Ende der Logdatei hinzu
Zeigen die Testparameter auch in der Abschlussmeldung an
(D)
1170 lines
48 KiB
Python
1170 lines
48 KiB
Python
# -*- coding: utf-8 -*-
|
||
import os
|
||
import time
|
||
import csv
|
||
import threading
|
||
from datetime import datetime
|
||
import numpy as np
|
||
import matplotlib
|
||
matplotlib.use('Qt5Agg')
|
||
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
|
||
from matplotlib.figure import Figure
|
||
from collections import deque
|
||
from queue import Queue, Full, Empty
|
||
|
||
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel,
|
||
QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog)
|
||
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread
|
||
import pysmu
|
||
|
||
class DeviceDisconnectedError(Exception):
|
||
pass
|
||
|
||
class MeasurementThread(QThread):
|
||
update_signal = pyqtSignal(float, float, float)
|
||
error_signal = pyqtSignal(str)
|
||
|
||
def __init__(self, device, interval=0.1):
|
||
super().__init__()
|
||
self.device = device
|
||
self.interval = interval
|
||
self._running = False
|
||
self.filter_window_size = 10
|
||
self.voltage_window = []
|
||
self.current_window = []
|
||
self.start_time = time.time()
|
||
self.measurement_queue = Queue(maxsize=1)
|
||
|
||
def run(self):
|
||
self._running = True
|
||
while self._running:
|
||
try:
|
||
samples = self.device.read(self.filter_window_size, 500, True)
|
||
if not samples:
|
||
raise DeviceDisconnectedError("No samples received")
|
||
|
||
current_time = time.time() - self.start_time
|
||
|
||
# Get voltage from Channel B (HI_Z mode) and current from Channel A
|
||
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
|
||
raw_current = np.mean([s[0][1] for s in samples]) # Channel A current
|
||
|
||
# Update filter windows
|
||
self.voltage_window.append(raw_voltage)
|
||
self.current_window.append(raw_current)
|
||
|
||
if len(self.voltage_window) > self.filter_window_size:
|
||
self.voltage_window.pop(0)
|
||
self.current_window.pop(0)
|
||
|
||
voltage = np.mean(self.voltage_window)
|
||
current = np.mean(self.current_window)
|
||
|
||
# Emit update
|
||
self.update_signal.emit(voltage, current, current_time)
|
||
|
||
# Store measurement
|
||
try:
|
||
self.measurement_queue.put_nowait((voltage, current))
|
||
except Full:
|
||
pass
|
||
|
||
time.sleep(max(0.05, self.interval))
|
||
|
||
except Exception as e:
|
||
self.error_signal.emit(f"Read error: {str(e)}")
|
||
time.sleep(1)
|
||
continue
|
||
|
||
def stop(self):
|
||
self._running = False
|
||
self.wait(500)
|
||
|
||
class TestSequenceWorker(QObject):
|
||
finished = pyqtSignal()
|
||
update_phase = pyqtSignal(str)
|
||
update_status = pyqtSignal(str)
|
||
test_completed = pyqtSignal()
|
||
error_occurred = pyqtSignal(str)
|
||
|
||
def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent):
|
||
super().__init__()
|
||
self.device = device
|
||
self.test_current = test_current
|
||
self.charge_cutoff = charge_cutoff
|
||
self.discharge_cutoff = discharge_cutoff
|
||
self.rest_time = rest_time * 3600 # Convert hours to seconds
|
||
self.continuous_mode = continuous_mode
|
||
self.parent = parent
|
||
self._running = True
|
||
self.voltage_timeout = 0.5 # seconds
|
||
|
||
def get_latest_measurement(self):
|
||
"""Thread-safe measurement reading with timeout"""
|
||
try:
|
||
return self.parent.measurement_thread.measurement_queue.get(
|
||
timeout=self.voltage_timeout
|
||
)
|
||
except Empty:
|
||
return (None, None) # Return tuple for unpacking
|
||
|
||
def charge_phase(self):
|
||
"""Handle the battery charging phase"""
|
||
self.update_phase.emit("Charge")
|
||
self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.3f}A")
|
||
|
||
try:
|
||
# Configure channels - Channel A sources current, Channel B measures voltage
|
||
self.device.channels['B'].mode = pysmu.Mode.HI_Z
|
||
self.device.channels['A'].mode = pysmu.Mode.SIMV
|
||
self.device.channels['A'].constant(self.test_current)
|
||
|
||
# Small delay to allow current to stabilize
|
||
time.sleep(0.1)
|
||
|
||
while self._running:
|
||
voltage, current = self.get_latest_measurement()
|
||
if voltage is None:
|
||
continue
|
||
|
||
# Update parent's data for logging/display
|
||
with self.parent.plot_mutex:
|
||
if len(self.parent.voltage_data) > 0:
|
||
self.parent.voltage_data[-1] = voltage
|
||
self.parent.current_data[-1] = current
|
||
|
||
if voltage >= self.charge_cutoff:
|
||
break
|
||
|
||
time.sleep(0.1)
|
||
|
||
finally:
|
||
self.device.channels['A'].mode = pysmu.Mode.HI_Z
|
||
self.device.channels['A'].constant(0)
|
||
|
||
def discharge_phase(self):
|
||
"""Handle the battery discharging phase"""
|
||
self.update_phase.emit("Discharge")
|
||
self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.3f}A")
|
||
|
||
try:
|
||
# Configure channels - Channel A sinks current, Channel B measures voltage
|
||
self.device.channels['B'].mode = pysmu.Mode.HI_Z
|
||
self.device.channels['A'].mode = pysmu.Mode.SIMV
|
||
self.device.channels['A'].constant(-self.test_current)
|
||
|
||
# Small delay to allow current to stabilize
|
||
time.sleep(0.1)
|
||
|
||
while self._running:
|
||
voltage, current = self.get_latest_measurement()
|
||
if voltage is None:
|
||
continue
|
||
|
||
# Update parent's data for logging/display
|
||
with self.parent.plot_mutex:
|
||
if len(self.parent.voltage_data) > 0:
|
||
self.parent.voltage_data[-1] = voltage
|
||
self.parent.current_data[-1] = current
|
||
|
||
if voltage <= self.discharge_cutoff:
|
||
break
|
||
|
||
time.sleep(0.1)
|
||
|
||
finally:
|
||
self.device.channels['A'].mode = pysmu.Mode.HI_Z
|
||
self.device.channels['A'].constant(0)
|
||
|
||
def rest_phase(self, phase_name):
|
||
"""Handle rest period between phases"""
|
||
self.update_phase.emit(f"Resting ({phase_name})")
|
||
rest_end = time.time() + self.rest_time
|
||
|
||
while time.time() < rest_end and self._running:
|
||
time_left = max(0, rest_end - time.time())
|
||
self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min")
|
||
time.sleep(1)
|
||
|
||
def stop(self):
|
||
"""Request the thread to stop"""
|
||
self._running = False
|
||
self.device.channels['A'].mode = pysmu.Mode.HI_Z
|
||
self.device.channels['A'].constant(0)
|
||
|
||
def run(self):
|
||
"""Main test sequence loop"""
|
||
try:
|
||
while self._running and (self.continuous_mode or self.parent.cycle_count == 0):
|
||
# Reset stop request at start of each cycle
|
||
self.parent.request_stop = False
|
||
self.parent.cycle_count += 1
|
||
|
||
# 1. Charge phase (constant current)
|
||
self.charge_phase()
|
||
if not self._running or self.parent.request_stop:
|
||
break
|
||
|
||
# 2. Rest period after charge
|
||
self.rest_phase("Post-Charge")
|
||
if not self._running or self.parent.request_stop:
|
||
break
|
||
|
||
# 3. Discharge phase (capacity measurement)
|
||
self.discharge_phase()
|
||
if not self._running or self.parent.request_stop:
|
||
break
|
||
|
||
# 4. Rest period after discharge (only if not stopping)
|
||
if self._running and not self.parent.request_stop:
|
||
self.rest_phase("Post-Discharge")
|
||
|
||
# Calculate Coulomb efficiency if not stopping
|
||
if not self.parent.request_stop and self.parent.charge_capacity > 0:
|
||
self.parent.coulomb_efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100
|
||
|
||
# Test completed
|
||
self.test_completed.emit()
|
||
|
||
except Exception as e:
|
||
self.error_occurred.emit(f"Test sequence error: {str(e)}")
|
||
finally:
|
||
self.finished.emit()
|
||
|
||
class BatteryTester(QMainWindow):
|
||
def __init__(self):
|
||
self.plot_mutex = threading.Lock()
|
||
super().__init__()
|
||
|
||
# Color scheme
|
||
self.bg_color = "#2E3440"
|
||
self.fg_color = "#D8DEE9"
|
||
self.accent_color = "#5E81AC"
|
||
self.warning_color = "#BF616A"
|
||
self.success_color = "#A3BE8C"
|
||
|
||
# Device and measurement state
|
||
self.session_active = False
|
||
self.measuring = False
|
||
self.test_running = False
|
||
self.continuous_mode = False
|
||
self.request_stop = False
|
||
self.interval = 0.1
|
||
self.log_dir = os.path.expanduser("~/adalm1000/logs")
|
||
os.makedirs(self.log_dir, exist_ok=True)
|
||
|
||
# Data buffers
|
||
self.time_data = deque()
|
||
self.voltage_data = deque()
|
||
self.current_data = deque()
|
||
self.phase_data = deque()
|
||
|
||
# Initialize UI and device
|
||
self.setup_ui()
|
||
self.init_device()
|
||
|
||
# Set window properties
|
||
self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)")
|
||
self.resize(1000, 800)
|
||
self.setMinimumSize(800, 700)
|
||
|
||
# Status update timer
|
||
self.status_timer = QTimer()
|
||
self.status_timer.timeout.connect(self.update_status)
|
||
self.status_timer.start(1000) # Update every second
|
||
|
||
def setup_ui(self):
|
||
"""Configure the user interface"""
|
||
# Main widget and layout
|
||
self.central_widget = QWidget()
|
||
self.setCentralWidget(self.central_widget)
|
||
self.main_layout = QVBoxLayout(self.central_widget)
|
||
self.main_layout.setContentsMargins(10, 10, 10, 10)
|
||
|
||
# Header area
|
||
header_frame = QFrame()
|
||
header_frame.setFrameShape(QFrame.NoFrame)
|
||
header_layout = QHBoxLayout(header_frame)
|
||
header_layout.setContentsMargins(0, 0, 0, 0)
|
||
|
||
self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)")
|
||
self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};")
|
||
header_layout.addWidget(self.title_label, 1)
|
||
|
||
# Status indicator
|
||
self.status_light = QLabel()
|
||
self.status_light.setFixedSize(20, 20)
|
||
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
|
||
header_layout.addWidget(self.status_light)
|
||
|
||
self.connection_label = QLabel("Disconnected")
|
||
header_layout.addWidget(self.connection_label)
|
||
|
||
# Reconnect button
|
||
self.reconnect_btn = QPushButton("Reconnect")
|
||
self.reconnect_btn.clicked.connect(self.reconnect_device)
|
||
header_layout.addWidget(self.reconnect_btn)
|
||
|
||
self.main_layout.addWidget(header_frame)
|
||
|
||
# Measurement display
|
||
display_frame = QFrame()
|
||
display_frame.setFrameShape(QFrame.StyledPanel)
|
||
display_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
|
||
display_layout = QGridLayout(display_frame)
|
||
|
||
# Measurement values
|
||
measurement_labels = [
|
||
("Voltage", "V"), ("Current", "A"), ("Test Phase", ""),
|
||
("Elapsed Time", "s"), ("Discharge Capacity", "Ah"), ("Charge Capacity", "Ah"),
|
||
("Coulomb Eff.", "%"), ("Cycle Count", ""), ("Battery Temp", "°C"),
|
||
("Internal R", "Ω"), ("Power", "W"), ("Energy", "Wh")
|
||
]
|
||
|
||
# 4 Zeilen × 3 Spalten Anordnung
|
||
for i, (label, unit) in enumerate(measurement_labels):
|
||
row = i // 3 # 0-3 (4 Zeilen)
|
||
col = (i % 3) * 3 # 0, 3, 6 (3 Spalten mit je 3 Widgets)
|
||
|
||
# Label für den Messwertnamen
|
||
lbl = QLabel(f"{label}:")
|
||
lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
|
||
display_layout.addWidget(lbl, row, col)
|
||
|
||
# Label für den Messwert
|
||
value_lbl = QLabel("0.000")
|
||
value_lbl.setStyleSheet(f"""
|
||
color: {self.fg_color};
|
||
font-weight: bold;
|
||
font-size: 12px;
|
||
min-width: 60px;
|
||
""")
|
||
display_layout.addWidget(value_lbl, row, col + 1)
|
||
|
||
# Einheit falls vorhanden
|
||
if unit:
|
||
unit_lbl = QLabel(unit)
|
||
unit_lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
|
||
display_layout.addWidget(unit_lbl, row, col + 2)
|
||
|
||
# Spaltenabstände anpassen
|
||
for i in range(9): # 3 Spalten × 3 Widgets
|
||
display_layout.setColumnStretch(i, 1 if i % 3 == 1 else 0) # Nur Wert-Spalten dehnen
|
||
|
||
# Referenzen aktualisieren
|
||
self.voltage_label = display_layout.itemAtPosition(0, 1).widget()
|
||
self.current_label = display_layout.itemAtPosition(0, 4).widget()
|
||
self.phase_label = display_layout.itemAtPosition(0, 7).widget()
|
||
self.time_label = display_layout.itemAtPosition(1, 1).widget()
|
||
self.capacity_label = display_layout.itemAtPosition(1, 4).widget()
|
||
self.charge_capacity_label = display_layout.itemAtPosition(1, 7).widget()
|
||
self.efficiency_label = display_layout.itemAtPosition(2, 1).widget()
|
||
self.cycle_label = display_layout.itemAtPosition(2, 4).widget()
|
||
self.temp_label = display_layout.itemAtPosition(2, 7).widget()
|
||
self.resistance_label = display_layout.itemAtPosition(3, 1).widget()
|
||
self.power_label = display_layout.itemAtPosition(3, 4).widget()
|
||
self.energy_label = display_layout.itemAtPosition(3, 7).widget()
|
||
|
||
self.main_layout.addWidget(display_frame)
|
||
|
||
# Control area
|
||
controls_frame = QFrame()
|
||
controls_frame.setFrameShape(QFrame.NoFrame)
|
||
controls_layout = QHBoxLayout(controls_frame)
|
||
controls_layout.setContentsMargins(0, 0, 0, 0)
|
||
|
||
# Parameters frame
|
||
params_frame = QFrame()
|
||
params_frame.setFrameShape(QFrame.StyledPanel)
|
||
params_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
|
||
params_layout = QGridLayout(params_frame)
|
||
|
||
# Battery capacity
|
||
self.capacity = 0.2
|
||
self.capacity_label_input = QLabel("Battery Capacity (Ah):")
|
||
self.capacity_label_input.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.capacity_label_input, 0, 0)
|
||
self.capacity_input = QLineEdit("0.2")
|
||
self.capacity_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.capacity_input.setFixedWidth(60)
|
||
params_layout.addWidget(self.capacity_input, 0, 1)
|
||
|
||
# Charge cutoff
|
||
self.charge_cutoff = 1.43
|
||
self.charge_cutoff_label = QLabel("Charge Cutoff (V):")
|
||
self.charge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.charge_cutoff_label, 1, 0)
|
||
self.charge_cutoff_input = QLineEdit("1.43")
|
||
self.charge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.charge_cutoff_input.setFixedWidth(60)
|
||
params_layout.addWidget(self.charge_cutoff_input, 1, 1)
|
||
|
||
# Discharge cutoff
|
||
self.discharge_cutoff = 0.9
|
||
self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):")
|
||
self.discharge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.discharge_cutoff_label, 2, 0)
|
||
self.discharge_cutoff_input = QLineEdit("0.9")
|
||
self.discharge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.discharge_cutoff_input.setFixedWidth(60)
|
||
params_layout.addWidget(self.discharge_cutoff_input, 2, 1)
|
||
|
||
# Rest time
|
||
self.rest_time = 0.25
|
||
self.rest_time_label = QLabel("Rest Time (hours):")
|
||
self.rest_time_label.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.rest_time_label, 3, 0)
|
||
self.rest_time_input = QLineEdit("0.25")
|
||
self.rest_time_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.rest_time_input.setFixedWidth(60)
|
||
params_layout.addWidget(self.rest_time_input, 3, 1)
|
||
|
||
# C-rate for test
|
||
self.c_rate = 0.1
|
||
self.c_rate_label = QLabel("Test C-rate:")
|
||
self.c_rate_label.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.c_rate_label, 0, 2)
|
||
self.c_rate_input = QLineEdit("0.1")
|
||
self.c_rate_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.c_rate_input.setFixedWidth(40)
|
||
params_layout.addWidget(self.c_rate_input, 0, 3)
|
||
|
||
c_rate_note = QLabel("(e.g., 0.2 for C/5)")
|
||
c_rate_note.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(c_rate_note, 0, 4)
|
||
|
||
controls_layout.addWidget(params_frame, 1)
|
||
|
||
# Test conditions input
|
||
self.test_conditions_label = QLabel("Test Conditions/Chemistry:")
|
||
self.test_conditions_label.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.test_conditions_label, 4, 0)
|
||
self.test_conditions_input = QLineEdit("")
|
||
self.test_conditions_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.test_conditions_input.setFixedWidth(120)
|
||
params_layout.addWidget(self.test_conditions_input, 4, 1)
|
||
|
||
# Button frame
|
||
button_frame = QFrame()
|
||
button_frame.setFrameShape(QFrame.NoFrame)
|
||
button_layout = QVBoxLayout(button_frame)
|
||
button_layout.setContentsMargins(0, 0, 0, 0)
|
||
|
||
self.start_button = QPushButton("START TEST")
|
||
self.start_button.setStyleSheet(f"""
|
||
QPushButton {{
|
||
background-color: {self.accent_color};
|
||
color: {self.fg_color};
|
||
font-weight: bold;
|
||
padding: 6px;
|
||
border-radius: 4px;
|
||
}}
|
||
QPushButton:disabled {{
|
||
background-color: #4C566A;
|
||
color: #D8DEE9;
|
||
}}
|
||
""")
|
||
self.start_button.clicked.connect(self.start_test)
|
||
button_layout.addWidget(self.start_button)
|
||
|
||
self.stop_button = QPushButton("STOP TEST")
|
||
self.stop_button.setStyleSheet(f"""
|
||
QPushButton {{
|
||
background-color: {self.warning_color};
|
||
color: {self.fg_color};
|
||
font-weight: bold;
|
||
padding: 6px;
|
||
border-radius: 4px;
|
||
}}
|
||
QPushButton:disabled {{
|
||
background-color: #4C566A;
|
||
color: #D8DEE9;
|
||
}}
|
||
""")
|
||
self.stop_button.clicked.connect(self.stop_test)
|
||
self.stop_button.setEnabled(False)
|
||
button_layout.addWidget(self.stop_button)
|
||
|
||
# Continuous mode checkbox
|
||
self.continuous_mode_check = QCheckBox("Continuous Mode")
|
||
self.continuous_mode_check.setChecked(True)
|
||
self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")
|
||
button_layout.addWidget(self.continuous_mode_check)
|
||
|
||
controls_layout.addWidget(button_frame)
|
||
self.main_layout.addWidget(controls_frame)
|
||
|
||
# Plot area
|
||
self.setup_plot()
|
||
|
||
# Status bar
|
||
self.status_bar = self.statusBar()
|
||
self.status_bar.setStyleSheet(f"color: {self.fg_color};")
|
||
self.status_bar.showMessage("Ready")
|
||
|
||
# Apply dark theme
|
||
self.setStyleSheet(f"""
|
||
QMainWindow {{
|
||
background-color: {self.bg_color};
|
||
}}
|
||
QLabel {{
|
||
color: {self.fg_color};
|
||
}}
|
||
QLineEdit {{
|
||
background-color: #3B4252;
|
||
color: {self.fg_color};
|
||
border: 1px solid #4C566A;
|
||
border-radius: 3px;
|
||
padding: 2px;
|
||
}}
|
||
""")
|
||
|
||
def setup_plot(self):
|
||
"""Configure the matplotlib plot"""
|
||
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color)
|
||
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
|
||
self.ax = self.fig.add_subplot(111)
|
||
self.ax.set_facecolor('#3B4252')
|
||
|
||
# Set initial voltage range
|
||
voltage_padding = 0.2
|
||
min_voltage = max(0, 0.9 - voltage_padding)
|
||
max_voltage = 1.43 + voltage_padding
|
||
self.ax.set_ylim(min_voltage, max_voltage)
|
||
|
||
# Voltage plot
|
||
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
|
||
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
|
||
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
|
||
|
||
# Current plot (right axis)
|
||
self.ax2 = self.ax.twinx()
|
||
current_padding = 0.05
|
||
test_current = 0.1 * 0.2 # Default values
|
||
max_current = test_current * 1.5
|
||
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
|
||
|
||
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
|
||
self.ax2.set_ylabel("Current (A)", color='r')
|
||
self.ax2.tick_params(axis='y', labelcolor='r')
|
||
|
||
self.ax.set_xlabel('Time (s)', color=self.fg_color)
|
||
self.ax.set_title('Battery Test (CC)', color=self.fg_color)
|
||
self.ax.tick_params(axis='x', colors=self.fg_color)
|
||
self.ax.grid(True, color='#4C566A')
|
||
|
||
# Position legends
|
||
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
|
||
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
|
||
|
||
# Embed plot
|
||
self.canvas = FigureCanvas(self.fig)
|
||
self.canvas.setStyleSheet(f"background-color: {self.bg_color};")
|
||
self.main_layout.addWidget(self.canvas, 1)
|
||
|
||
def init_device(self):
|
||
"""Initialize the ADALM1000 device with continuous measurement"""
|
||
try:
|
||
# Clean up any existing session
|
||
if hasattr(self, 'session'):
|
||
try:
|
||
self.session.end()
|
||
del self.session
|
||
except:
|
||
pass
|
||
|
||
time.sleep(1)
|
||
|
||
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
|
||
if not self.session.devices:
|
||
raise Exception("No ADALM1000 detected - check connections")
|
||
|
||
self.dev = self.session.devices[0]
|
||
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
||
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
||
self.dev.channels['A'].constant(0)
|
||
self.dev.channels['B'].constant(0)
|
||
|
||
self.session.start(0)
|
||
|
||
self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;")
|
||
self.connection_label.setText("Connected")
|
||
self.status_bar.showMessage("Device connected | Ready to measure")
|
||
self.session_active = True
|
||
self.start_button.setEnabled(True)
|
||
|
||
# Start measurement thread
|
||
self.measurement_thread = MeasurementThread(self.dev, self.interval)
|
||
self.measurement_thread.update_signal.connect(self.update_measurements)
|
||
self.measurement_thread.error_signal.connect(self.handle_device_error)
|
||
|
||
# Start the QThread directly (no need for threading.Thread)
|
||
self.measurement_thread.start()
|
||
|
||
except Exception as e:
|
||
self.handle_device_error(str(e))
|
||
|
||
@pyqtSlot(float, float, float)
|
||
def update_measurements(self, voltage, current, current_time):
|
||
"""Update measurements from the measurement thread"""
|
||
self.time_data.append(current_time)
|
||
self.voltage_data.append(voltage)
|
||
self.current_data.append(current)
|
||
|
||
# Update display
|
||
self.voltage_label.setText(f"{voltage:.4f}")
|
||
self.current_label.setText(f"{current:.4f}")
|
||
self.time_label.setText(self.format_time(current_time))
|
||
|
||
# Throttle plot updates to avoid recursive repaint
|
||
now = time.time()
|
||
if not hasattr(self, '_last_plot_update'):
|
||
self._last_plot_update = 0
|
||
|
||
if now - self._last_plot_update > 0.1: # Update plot max 10 times per second
|
||
self._last_plot_update = now
|
||
QTimer.singleShot(0, self.update_plot)
|
||
|
||
def update_status(self):
|
||
"""Update status information periodically"""
|
||
if self.test_running:
|
||
# Update capacity calculations if in test mode
|
||
if self.measuring and self.time_data:
|
||
current_time = time.time() - self.start_time
|
||
delta_t = current_time - self.last_update_time
|
||
self.last_update_time = current_time
|
||
|
||
if self.test_phase == "Discharge":
|
||
current_current = abs(self.current_data[-1])
|
||
self.capacity_ah += current_current * delta_t / 3600
|
||
self.capacity_label.setText(f"{self.capacity_ah:.4f}")
|
||
elif self.test_phase == "Charge":
|
||
current_current = abs(self.current_data[-1])
|
||
self.charge_capacity += current_current * delta_t / 3600
|
||
self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}")
|
||
|
||
def start_test(self):
|
||
"""Start the full battery test cycle"""
|
||
if not self.test_running:
|
||
try:
|
||
# Get parameters from UI
|
||
self.capacity = float(self.capacity_input.text())
|
||
self.charge_cutoff = float(self.charge_cutoff_input.text())
|
||
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
|
||
self.rest_time = float(self.rest_time_input.text())
|
||
self.c_rate = float(self.c_rate_input.text())
|
||
|
||
# Validate inputs
|
||
if self.capacity <= 0:
|
||
raise ValueError("Battery capacity must be positive")
|
||
if self.charge_cutoff <= self.discharge_cutoff:
|
||
raise ValueError("Charge cutoff must be higher than discharge cutoff")
|
||
if self.c_rate <= 0:
|
||
raise ValueError("C-rate must be positive")
|
||
|
||
test_current = self.c_rate * self.capacity
|
||
if test_current > 0.2:
|
||
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
|
||
|
||
# Clear previous data
|
||
self.time_data.clear()
|
||
self.voltage_data.clear()
|
||
self.current_data.clear()
|
||
self.phase_data.clear()
|
||
self.capacity_ah = 0.0
|
||
self.charge_capacity = 0.0
|
||
self.coulomb_efficiency = 0.0
|
||
self.cycle_count = 0
|
||
|
||
# Reset plot with proper ranges
|
||
self.reset_plot()
|
||
|
||
# Generate filename and create log file
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}")
|
||
self.create_cycle_log_file()
|
||
|
||
# Start test
|
||
self.test_running = True
|
||
self.start_time = time.time()
|
||
self.last_update_time = time.time()
|
||
self.test_phase = "Initial Discharge"
|
||
self.phase_label.setText(self.test_phase)
|
||
|
||
self.start_button.setEnabled(False)
|
||
self.stop_button.setEnabled(True)
|
||
self.status_bar.showMessage(f"Test started | Discharging to {self.discharge_cutoff}V @ {test_current:.3f}A")
|
||
|
||
# Start test sequence in a QThread
|
||
self.test_sequence_thread = QThread()
|
||
self.test_sequence_worker = TestSequenceWorker(
|
||
self.dev,
|
||
test_current,
|
||
self.charge_cutoff,
|
||
self.discharge_cutoff,
|
||
self.rest_time,
|
||
self.continuous_mode_check.isChecked(),
|
||
self # Pass reference to main window for callbacks
|
||
)
|
||
self.test_sequence_worker.moveToThread(self.test_sequence_thread)
|
||
|
||
# Connect signals
|
||
self.test_sequence_worker.update_phase.connect(self.update_test_phase)
|
||
self.test_sequence_worker.update_status.connect(self.status_bar.showMessage)
|
||
self.test_sequence_worker.test_completed.connect(self.finalize_test)
|
||
self.test_sequence_worker.error_occurred.connect(self.handle_test_error)
|
||
self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit)
|
||
self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater)
|
||
self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater)
|
||
|
||
# Start the thread and the worker's run method
|
||
self.test_sequence_thread.start()
|
||
QTimer.singleShot(0, self.test_sequence_worker.run)
|
||
|
||
# Start capacity calculation timer if not already running
|
||
if not self.status_timer.isActive():
|
||
self.status_timer.start(1000)
|
||
|
||
except Exception as e:
|
||
QMessageBox.critical(self, "Error", str(e))
|
||
# Ensure buttons are in correct state if error occurs
|
||
self.start_button.setEnabled(True)
|
||
self.stop_button.setEnabled(False)
|
||
|
||
def create_cycle_log_file(self):
|
||
"""Create a new log file for the current cycle"""
|
||
try:
|
||
# Close previous file if exists
|
||
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
|
||
try:
|
||
self.current_cycle_file.close()
|
||
except Exception as e:
|
||
print(f"Error closing previous log file: {e}")
|
||
|
||
# Ensure log directory exists
|
||
os.makedirs(self.log_dir, exist_ok=True)
|
||
|
||
if not os.access(self.log_dir, os.W_OK):
|
||
QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}")
|
||
return False
|
||
|
||
# Generate unique filename
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
self.filename = os.path.join(self.log_dir, f"battery_test_{timestamp}.csv")
|
||
|
||
# Open new file
|
||
try:
|
||
self.current_cycle_file = open(self.filename, 'w', newline='')
|
||
|
||
# Write header with test parameters
|
||
test_current = self.c_rate * self.capacity
|
||
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
|
||
|
||
self.current_cycle_file.write(f"# ADALM1000 Battery Test Log\n")
|
||
self.current_cycle_file.write(f"# Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||
self.current_cycle_file.write(f"# Battery Capacity: {self.capacity} Ah\n")
|
||
self.current_cycle_file.write(f"# Test Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n")
|
||
self.current_cycle_file.write(f"# Charge Cutoff: {self.charge_cutoff} V\n")
|
||
self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n")
|
||
self.current_cycle_file.write(f"# Rest Time: {self.rest_time} hours\n")
|
||
self.current_cycle_file.write(f"# Test Conditions/Chemistry: {test_conditions}\n")
|
||
self.current_cycle_file.write("#\n")
|
||
|
||
# Write data header
|
||
self.log_writer = csv.writer(self.current_cycle_file)
|
||
self.log_writer.writerow([
|
||
"Time(s)", "Voltage(V)", "Current(A)", "Phase",
|
||
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
|
||
"Coulomb_Eff(%)", "Cycle"
|
||
])
|
||
self.log_buffer = []
|
||
return True
|
||
except Exception as e:
|
||
QMessageBox.critical(self, "Error", f"Failed to create log file: {e}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"Error in create_cycle_log_file: {e}")
|
||
return False
|
||
|
||
def format_time(self, seconds):
|
||
"""Convert seconds to hh:mm:ss format"""
|
||
hours = int(seconds // 3600)
|
||
minutes = int((seconds % 3600) // 60)
|
||
seconds = int(seconds % 60)
|
||
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
||
|
||
def stop_test(self):
|
||
"""Request immediate stop of the test"""
|
||
if not self.test_running:
|
||
return
|
||
|
||
self.request_stop = True
|
||
self.test_running = False
|
||
self.measuring = False
|
||
self.test_phase = "Idle"
|
||
self.phase_label.setText(self.test_phase)
|
||
|
||
if hasattr(self, 'dev'):
|
||
try:
|
||
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
||
self.dev.channels['A'].constant(0)
|
||
except Exception as e:
|
||
print(f"Error resetting device: {e}")
|
||
|
||
self.time_data.clear()
|
||
self.voltage_data.clear()
|
||
self.current_data.clear()
|
||
self.phase_data.clear()
|
||
|
||
self.capacity_ah = 0.0
|
||
self.charge_capacity = 0.0
|
||
self.coulomb_efficiency = 0.0
|
||
|
||
self.reset_plot()
|
||
|
||
self.status_bar.showMessage("Test stopped - Ready for new test")
|
||
self.stop_button.setEnabled(False)
|
||
self.start_button.setEnabled(True)
|
||
|
||
self.finalize_test()
|
||
|
||
def finalize_test(self):
|
||
"""Final cleanup after test completes or is stopped"""
|
||
self.measuring = False
|
||
if hasattr(self, 'dev'):
|
||
try:
|
||
self.dev.channels['A'].constant(0)
|
||
except Exception as e:
|
||
print(f"Error resetting device: {e}")
|
||
|
||
# Only try to close if file exists and is open
|
||
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
|
||
try:
|
||
if self.log_buffer:
|
||
self.log_writer.writerows(self.log_buffer)
|
||
self.log_buffer.clear()
|
||
|
||
# Write test summary
|
||
test_current = self.c_rate * self.capacity
|
||
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
|
||
|
||
self.current_cycle_file.write("\n# TEST SUMMARY\n")
|
||
self.current_cycle_file.write(f"# Test Parameters:\n")
|
||
self.current_cycle_file.write(f"# - Battery Capacity: {self.capacity} Ah\n")
|
||
self.current_cycle_file.write(f"# - Test Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n")
|
||
self.current_cycle_file.write(f"# - Charge Cutoff: {self.charge_cutoff} V\n")
|
||
self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n")
|
||
self.current_cycle_file.write(f"# - Test Conditions: {test_conditions}\n")
|
||
self.current_cycle_file.write(f"# Results:\n")
|
||
self.current_cycle_file.write(f"# - Cycles Completed: {self.cycle_count}\n")
|
||
self.current_cycle_file.write(f"# - Final Discharge Capacity: {self.capacity_ah:.4f} Ah\n")
|
||
self.current_cycle_file.write(f"# - Final Charge Capacity: {self.charge_capacity:.4f} Ah\n")
|
||
self.current_cycle_file.write(f"# - Coulombic Efficiency: {self.coulomb_efficiency:.1f}%\n")
|
||
|
||
self.current_cycle_file.close()
|
||
except Exception as e:
|
||
print(f"Error closing log file: {e}")
|
||
finally:
|
||
self.current_cycle_file = None
|
||
|
||
self.start_button.setEnabled(True)
|
||
self.stop_button.setEnabled(False)
|
||
self.request_stop = False
|
||
|
||
message = (
|
||
f"Test safely stopped after discharge phase | "
|
||
f"Cycle {self.cycle_count} completed | "
|
||
f"Final capacity: {self.capacity_ah:.3f}Ah"
|
||
)
|
||
self.status_bar.showMessage(message)
|
||
|
||
QMessageBox.information(
|
||
self,
|
||
"Test Completed",
|
||
f"Test was safely stopped after discharge phase.\n\n"
|
||
f"Test Parameters:\n"
|
||
f"- Capacity: {self.capacity} Ah\n"
|
||
f"- Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n"
|
||
f"- Charge Cutoff: {self.charge_cutoff} V\n"
|
||
f"- Discharge Cutoff: {self.discharge_cutoff} V\n"
|
||
f"- Conditions: {test_conditions}\n\n"
|
||
f"Results:\n"
|
||
f"- Cycles: {self.cycle_count}\n"
|
||
f"- Discharge capacity: {self.capacity_ah:.3f}Ah\n"
|
||
f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%"
|
||
)
|
||
|
||
def reset_plot(self):
|
||
"""Reset the plot completely for a new test"""
|
||
self.line_voltage.set_data([], [])
|
||
self.line_current.set_data([], [])
|
||
|
||
self.time_data.clear()
|
||
self.voltage_data.clear()
|
||
self.current_data.clear()
|
||
|
||
voltage_padding = 0.2
|
||
min_voltage = max(0, self.discharge_cutoff - voltage_padding)
|
||
max_voltage = self.charge_cutoff + voltage_padding
|
||
self.ax.set_xlim(0, 10)
|
||
self.ax.set_ylim(min_voltage, max_voltage)
|
||
|
||
current_padding = 0.05
|
||
test_current = self.c_rate * self.capacity
|
||
max_current = test_current * 1.5
|
||
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
|
||
|
||
self.canvas.draw()
|
||
|
||
def write_cycle_summary(self):
|
||
"""Write cycle summary to the current cycle's log file"""
|
||
if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file:
|
||
return
|
||
|
||
summary_line = (
|
||
f"Cycle {self.cycle_count} Summary - "
|
||
f"Discharge={self.capacity_ah:.4f}Ah, "
|
||
f"Charge={self.charge_capacity:.4f}Ah, "
|
||
f"Efficiency={self.coulomb_efficiency:.1f}%"
|
||
)
|
||
|
||
try:
|
||
if self.log_buffer:
|
||
self.log_writer.writerows(self.log_buffer)
|
||
self.log_buffer.clear()
|
||
self.current_cycle_file.write(summary_line + "\n")
|
||
self.current_cycle_file.flush()
|
||
except Exception as e:
|
||
print(f"Error writing cycle summary: {e}")
|
||
|
||
def update_plot(self):
|
||
"""More reliable plotting with better error handling"""
|
||
try:
|
||
# Create local copies safely
|
||
with self.plot_mutex:
|
||
if not self.time_data or not self.voltage_data or not self.current_data:
|
||
return
|
||
|
||
if len(self.time_data) != len(self.voltage_data) or len(self.time_data) != len(self.current_data):
|
||
# Find the minimum length to avoid mismatch
|
||
min_len = min(len(self.time_data), len(self.voltage_data), len(self.current_data))
|
||
x_data = np.array(self.time_data[-min_len:])
|
||
y1_data = np.array(self.voltage_data[-min_len:])
|
||
y2_data = np.array(self.current_data[-min_len:])
|
||
else:
|
||
x_data = np.array(self.time_data)
|
||
y1_data = np.array(self.voltage_data)
|
||
y2_data = np.array(self.current_data)
|
||
|
||
# Update plot data
|
||
self.line_voltage.set_data(x_data, y1_data)
|
||
self.line_current.set_data(x_data, y2_data)
|
||
|
||
# Auto-scale when needed
|
||
if len(x_data) > 0 and x_data[-1] > self.ax.get_xlim()[1] * 0.8:
|
||
self.auto_scale_axes()
|
||
|
||
# Force redraw
|
||
self.canvas.draw_idle()
|
||
|
||
except Exception as e:
|
||
print(f"Plot error: {e}")
|
||
# Reset plot on error
|
||
self.line_voltage.set_data([], [])
|
||
self.line_current.set_data([], [])
|
||
self.canvas.draw_idle()
|
||
|
||
def auto_scale_axes(self):
|
||
"""Auto-scale plot axes with appropriate padding and strict boundaries"""
|
||
if not self.time_data:
|
||
return
|
||
|
||
min_time = 0
|
||
max_time = self.time_data[-1]
|
||
current_xlim = self.ax.get_xlim()
|
||
|
||
if max_time > current_xlim[1] * 0.95:
|
||
new_max = max_time * 1.05
|
||
self.ax.set_xlim(min_time, new_max)
|
||
self.ax2.set_xlim(min_time, new_max)
|
||
|
||
voltage_padding = 0.2
|
||
if self.voltage_data:
|
||
min_voltage = max(0, min(self.voltage_data) - voltage_padding)
|
||
max_voltage = min(5.0, max(self.voltage_data) + voltage_padding)
|
||
current_ylim = self.ax.get_ylim()
|
||
if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1):
|
||
self.ax.set_ylim(min_voltage, max_voltage)
|
||
|
||
current_padding = 0.05
|
||
if self.current_data:
|
||
min_current = max(-0.25, min(self.current_data) - current_padding)
|
||
max_current = min(0.25, max(self.current_data) + current_padding)
|
||
current_ylim2 = self.ax2.get_ylim()
|
||
if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02):
|
||
self.ax2.set_ylim(min_current, max_current)
|
||
|
||
@pyqtSlot(str)
|
||
def handle_device_error(self, error):
|
||
"""Handle device connection errors"""
|
||
error_msg = str(error)
|
||
print(f"Device error: {error_msg}")
|
||
|
||
if hasattr(self, 'session'):
|
||
try:
|
||
if self.session_active:
|
||
self.session.end()
|
||
del self.session
|
||
except Exception as e:
|
||
print(f"Error cleaning up session: {e}")
|
||
|
||
self.status_light.setStyleSheet(f"background-color: red; border-radius: 10px;")
|
||
self.connection_label.setText("Disconnected")
|
||
self.status_bar.showMessage(f"Device error: {error_msg}")
|
||
|
||
self.session_active = False
|
||
self.test_running = False
|
||
self.continuous_mode = False
|
||
self.measuring = False
|
||
|
||
self.start_button.setEnabled(False)
|
||
self.stop_button.setEnabled(False)
|
||
|
||
self.time_data.clear()
|
||
self.voltage_data.clear()
|
||
self.current_data.clear()
|
||
|
||
@pyqtSlot(str)
|
||
def update_test_phase(self, phase_text):
|
||
"""Update the test phase display"""
|
||
self.test_phase = phase_text
|
||
self.phase_label.setText(phase_text)
|
||
|
||
# Update log if available
|
||
if hasattr(self, 'log_buffer'):
|
||
current_time = time.time() - self.start_time
|
||
self.log_buffer.append([
|
||
f"{current_time:.3f}",
|
||
"",
|
||
"",
|
||
phase_text,
|
||
f"{self.capacity_ah:.4f}",
|
||
f"{self.charge_capacity:.4f}",
|
||
f"{self.coulomb_efficiency:.1f}" if hasattr(self, 'coulomb_efficiency') else "0.0",
|
||
f"{self.cycle_count}"
|
||
])
|
||
|
||
@pyqtSlot(str)
|
||
def handle_test_error(self, error_msg):
|
||
"""Handle errors from the test sequence with complete cleanup"""
|
||
try:
|
||
# 1. Notify user
|
||
QMessageBox.critical(self, "Test Error",
|
||
f"An error occurred:\n{error_msg}\n\nAttempting to recover...")
|
||
|
||
# 2. Stop all operations
|
||
self.stop_test()
|
||
|
||
# 3. Reset UI elements
|
||
if hasattr(self, 'line_voltage'):
|
||
try:
|
||
self.line_voltage.set_data([], [])
|
||
self.line_current.set_data([], [])
|
||
self.ax.set_xlim(0, 1)
|
||
self.ax2.set_xlim(0, 1)
|
||
self.canvas.draw()
|
||
except Exception as plot_error:
|
||
print(f"Plot reset error: {plot_error}")
|
||
|
||
# 4. Update status
|
||
self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...")
|
||
self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;")
|
||
|
||
# 5. Attempt recovery
|
||
QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect
|
||
|
||
except Exception as e:
|
||
print(f"Error in error handler: {e}")
|
||
# Fallback - restart application?
|
||
QMessageBox.critical(self, "Fatal Error",
|
||
"The application needs to restart due to an unrecoverable error")
|
||
QTimer.singleShot(1000, self.close)
|
||
|
||
def attempt_reconnect(self):
|
||
"""Attempt to reconnect automatically"""
|
||
QMessageBox.critical(
|
||
self,
|
||
"Device Connection Error",
|
||
"Could not connect to ADALM1000\n\n"
|
||
"1. Check USB cable connection\n"
|
||
"2. The device will attempt to reconnect automatically"
|
||
)
|
||
|
||
QTimer.singleShot(1000, self.reconnect_device)
|
||
|
||
def reconnect_device(self):
|
||
"""Reconnect the device with proper cleanup"""
|
||
self.status_bar.showMessage("Attempting to reconnect...")
|
||
|
||
if hasattr(self, 'session'):
|
||
try:
|
||
if self.session_active:
|
||
self.session.end()
|
||
del self.session
|
||
except:
|
||
pass
|
||
|
||
self.test_running = False
|
||
self.continuous_mode = False
|
||
self.measuring = False
|
||
|
||
if hasattr(self, 'measurement_thread'):
|
||
self.measurement_thread.stop()
|
||
|
||
time.sleep(1.5)
|
||
|
||
try:
|
||
self.init_device()
|
||
if self.session_active:
|
||
self.status_bar.showMessage("Reconnected successfully")
|
||
return
|
||
except Exception as e:
|
||
print(f"Reconnect failed: {e}")
|
||
|
||
self.status_bar.showMessage("Reconnect failed - will retry...")
|
||
QTimer.singleShot(2000, self.reconnect_device)
|
||
|
||
def closeEvent(self, event):
|
||
"""Clean up on window close"""
|
||
self.test_running = False
|
||
self.measuring = False
|
||
self.session_active = False
|
||
|
||
# Stop measurement thread
|
||
if hasattr(self, 'measurement_thread'):
|
||
self.measurement_thread.stop()
|
||
|
||
# Stop test sequence thread
|
||
if hasattr(self, 'test_sequence_thread'):
|
||
if hasattr(self, 'test_sequence_worker'):
|
||
self.test_sequence_worker.stop()
|
||
self.test_sequence_thread.quit()
|
||
self.test_sequence_thread.wait(500)
|
||
|
||
# Clean up device session
|
||
if hasattr(self, 'session') and self.session:
|
||
try:
|
||
self.session.end()
|
||
except Exception as e:
|
||
print(f"Error ending session: {e}")
|
||
|
||
event.accept()
|
||
|
||
if __name__ == "__main__":
|
||
app = QApplication([])
|
||
try:
|
||
window = BatteryTester()
|
||
window.show()
|
||
app.exec_()
|
||
except Exception as e:
|
||
QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}") |