These changes should:
Make the plot start at zero correctly
Prevent crashes when stopping tests
Allow multiple test cycles to run properly
Maintain stable operation during start/stop sequences
C&D
1330 lines
55 KiB
Python
1330 lines
55 KiB
Python
# -*- coding: utf-8 -*-
|
||
import os
|
||
import time
|
||
import csv
|
||
import threading
|
||
from datetime import datetime
|
||
import numpy as np
|
||
import matplotlib
|
||
matplotlib.use('Qt5Agg')
|
||
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
|
||
from matplotlib.figure import Figure
|
||
from collections import deque
|
||
from queue import Queue, Full, Empty
|
||
|
||
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel,
|
||
QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog)
|
||
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread
|
||
from PyQt5 import sip
|
||
import pysmu
|
||
|
||
class DeviceDisconnectedError(Exception):
|
||
pass
|
||
|
||
class MeasurementThread(QThread):
|
||
update_signal = pyqtSignal(float, float, float)
|
||
error_signal = pyqtSignal(str)
|
||
|
||
def __init__(self, device, interval=0.1):
|
||
super().__init__()
|
||
self.device = device
|
||
self.interval = interval
|
||
self._running = False
|
||
self.filter_window_size = 10
|
||
self.voltage_window = []
|
||
self.current_window = []
|
||
self.start_time = time.time()
|
||
self.measurement_queue = Queue(maxsize=1)
|
||
|
||
def run(self):
|
||
"""Continuous measurement loop"""
|
||
self._running = True
|
||
while self._running:
|
||
try:
|
||
samples = self.device.read(self.filter_window_size, 500, True)
|
||
if not samples:
|
||
raise DeviceDisconnectedError("No samples received")
|
||
|
||
current_time = time.time() - self.start_time
|
||
|
||
# Get voltage from Channel B (HI_Z mode) and current from Channel A
|
||
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
|
||
raw_current = np.mean([s[0][1] for s in samples]) # Channel A current
|
||
|
||
# Apply sign correction based on test phase if available
|
||
if hasattr(self, 'parent') and hasattr(self.parent, 'test_phase'):
|
||
if self.parent.test_phase == "Discharge":
|
||
raw_current = -abs(raw_current)
|
||
elif self.parent.test_phase == "Charge":
|
||
raw_current = abs(raw_current)
|
||
|
||
# Update filter windows
|
||
self.voltage_window.append(raw_voltage)
|
||
self.current_window.append(raw_current)
|
||
|
||
if len(self.voltage_window) > self.filter_window_size:
|
||
self.voltage_window.pop(0)
|
||
self.current_window.pop(0)
|
||
|
||
voltage = np.mean(self.voltage_window)
|
||
current = np.mean(self.current_window)
|
||
|
||
# Validate measurements
|
||
if not (0 <= voltage <= 5.0):
|
||
raise ValueError(f"Invalid voltage: {voltage}V")
|
||
if not (-0.25 <= current <= 0.25):
|
||
raise ValueError(f"Invalid current: {current}A")
|
||
|
||
# Emit update
|
||
self.update_signal.emit(voltage, current, current_time)
|
||
|
||
# Store measurement
|
||
try:
|
||
self.measurement_queue.put_nowait((voltage, current))
|
||
except Full:
|
||
pass
|
||
|
||
time.sleep(max(0.05, self.interval))
|
||
|
||
except Exception as e:
|
||
self.error_signal.emit(f"Read error: {str(e)}")
|
||
time.sleep(1)
|
||
continue
|
||
|
||
def stop(self):
|
||
self._running = False
|
||
self.wait(500)
|
||
|
||
|
||
class TestSequenceWorker(QObject):
|
||
# ... keep all existing methods except the run() method ...
|
||
|
||
def run(self):
|
||
"""Main test sequence loop"""
|
||
try:
|
||
while self._running and (self.continuous_mode or self.parent.cycle_count == 0):
|
||
# Reset stop request at start of each cycle
|
||
self.parent.request_stop = False
|
||
self.parent.cycle_count += 1
|
||
|
||
# 1. Charge phase (constant current)
|
||
self.charge_phase()
|
||
if not self._running or self.parent.request_stop:
|
||
break
|
||
|
||
# 2. Rest period after charge
|
||
self.rest_phase("Post-Charge")
|
||
if not self._running or self.parent.request_stop:
|
||
break
|
||
|
||
# 3. Discharge phase (capacity measurement)
|
||
self.discharge_phase()
|
||
if not self._running or self.parent.request_stop:
|
||
break
|
||
|
||
# 4. Rest period after discharge (only if not stopping)
|
||
if self._running and not self.parent.request_stop:
|
||
self.rest_phase("Post-Discharge")
|
||
|
||
# Calculate Coulomb efficiency if not stopping
|
||
if not self.parent.request_stop and self.parent.charge_capacity > 0:
|
||
self.parent.coulomb_efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100
|
||
|
||
# Test completed
|
||
self.test_completed.emit()
|
||
|
||
except Exception as e:
|
||
self.error_occurred.emit(f"Test sequence error: {str(e)}")
|
||
finally:
|
||
self.finished.emit()
|
||
|
||
class TestSequenceWorker(QObject):
|
||
finished = pyqtSignal()
|
||
update_phase = pyqtSignal(str)
|
||
update_status = pyqtSignal(str)
|
||
test_completed = pyqtSignal()
|
||
error_occurred = pyqtSignal(str)
|
||
|
||
def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent):
|
||
super().__init__()
|
||
self.device = device
|
||
self.test_current = test_current
|
||
self.charge_cutoff = charge_cutoff
|
||
self.discharge_cutoff = discharge_cutoff
|
||
self.rest_time = rest_time * 3600 # Convert hours to seconds
|
||
self.continuous_mode = continuous_mode
|
||
self.parent = parent
|
||
self._running = True
|
||
self.voltage_timeout = 0.5 # seconds
|
||
|
||
def get_latest_measurement(self):
|
||
"""Thread-safe measurement reading with timeout"""
|
||
try:
|
||
return self.parent.measurement_thread.measurement_queue.get(
|
||
timeout=self.voltage_timeout
|
||
)
|
||
except Empty:
|
||
return (None, None) # Return tuple for unpacking
|
||
|
||
def charge_phase(self):
|
||
"""Handle the battery charging phase"""
|
||
self.update_phase.emit("Charge")
|
||
self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.3f}A")
|
||
|
||
try:
|
||
# Configure channels - Channel A sources current, Channel B measures voltage
|
||
self.device.channels['B'].mode = pysmu.Mode.HI_Z
|
||
self.device.channels['A'].mode = pysmu.Mode.SIMV
|
||
self.device.channels['A'].constant(self.test_current)
|
||
|
||
# Small delay to allow current to stabilize
|
||
time.sleep(0.1)
|
||
|
||
while self._running:
|
||
voltage, current = self.get_latest_measurement()
|
||
if voltage is None:
|
||
continue
|
||
|
||
# Update parent's data for logging/display
|
||
with self.parent.plot_mutex:
|
||
if len(self.parent.voltage_data) > 0:
|
||
self.parent.voltage_data[-1] = voltage
|
||
self.parent.current_data[-1] = current
|
||
|
||
if voltage >= self.charge_cutoff:
|
||
break
|
||
|
||
time.sleep(0.1)
|
||
|
||
finally:
|
||
self.device.channels['A'].mode = pysmu.Mode.HI_Z
|
||
self.device.channels['A'].constant(0)
|
||
|
||
def discharge_phase(self):
|
||
"""Handle the battery discharging phase"""
|
||
voltage, _ = self.get_latest_measurement()
|
||
if voltage is not None and voltage <= self.discharge_cutoff:
|
||
self.update_status.emit(f"Already below discharge cutoff ({voltage:.3f}V ≤ {self.discharge_cutoff}V)")
|
||
return
|
||
self.update_phase.emit("Discharge")
|
||
self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.3f}A")
|
||
|
||
try:
|
||
# Configure channels - Channel A sinks current, Channel B measures voltage
|
||
self.device.channels['B'].mode = pysmu.Mode.HI_Z
|
||
self.device.channels['A'].mode = pysmu.Mode.SIMV
|
||
self.device.channels['A'].constant(-self.test_current)
|
||
|
||
# Small delay to allow current to stabilize
|
||
time.sleep(0.1)
|
||
|
||
while self._running:
|
||
voltage, current = self.get_latest_measurement()
|
||
if voltage is None:
|
||
continue
|
||
|
||
# Update parent's data for logging/display
|
||
with self.parent.plot_mutex:
|
||
if len(self.parent.voltage_data) > 0:
|
||
self.parent.voltage_data[-1] = voltage
|
||
self.parent.current_data[-1] = current
|
||
|
||
if voltage <= self.discharge_cutoff:
|
||
break
|
||
|
||
time.sleep(0.1)
|
||
|
||
finally:
|
||
self.device.channels['A'].mode = pysmu.Mode.HI_Z
|
||
self.device.channels['A'].constant(0)
|
||
|
||
def rest_phase(self, phase_name):
|
||
"""Handle rest period between phases"""
|
||
self.update_phase.emit(f"Resting ({phase_name})")
|
||
rest_end = time.time() + self.rest_time
|
||
|
||
while time.time() < rest_end and self._running:
|
||
time_left = max(0, rest_end - time.time())
|
||
self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min")
|
||
time.sleep(1)
|
||
|
||
def stop(self):
|
||
"""Request the thread to stop"""
|
||
self._running = False
|
||
try:
|
||
self.device.channels['A'].mode = pysmu.Mode.HI_Z
|
||
self.device.channels['A'].constant(0)
|
||
self.device.channels['B'].mode = pysmu.Mode.HI_Z
|
||
except Exception as e:
|
||
print(f"Error stopping device: {e}")
|
||
|
||
def run(self):
|
||
"""Main test sequence loop"""
|
||
try:
|
||
while self._running and (self.continuous_mode or self.parent.cycle_count == 0):
|
||
# Reset stop request at start of each cycle
|
||
self.parent.request_stop = False
|
||
self.parent.cycle_count += 1
|
||
|
||
# 1. Charge phase (constant current)
|
||
self.charge_phase()
|
||
if not self._running or self.parent.request_stop:
|
||
break
|
||
|
||
# 2. Rest period after charge
|
||
self.rest_phase("Post-Charge")
|
||
if not self._running or self.parent.request_stop:
|
||
break
|
||
|
||
# 3. Discharge phase (capacity measurement)
|
||
self.discharge_phase()
|
||
if not self._running or self.parent.request_stop:
|
||
break
|
||
|
||
# 4. Rest period after discharge (only if not stopping)
|
||
if self._running and not self.parent.request_stop:
|
||
self.rest_phase("Post-Discharge")
|
||
|
||
# Calculate Coulomb efficiency if not stopping
|
||
if not self.parent.request_stop and self.parent.charge_capacity > 0:
|
||
self.parent.coulomb_efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100
|
||
|
||
# Test completed
|
||
self.test_completed.emit()
|
||
|
||
except Exception as e:
|
||
self.error_occurred.emit(f"Test sequence error: {str(e)}")
|
||
finally:
|
||
self.finished.emit()
|
||
|
||
class BatteryTester(QMainWindow):
|
||
def __init__(self):
|
||
self.plot_mutex = threading.Lock()
|
||
super().__init__()
|
||
|
||
# Color scheme
|
||
self.bg_color = "#2E3440"
|
||
self.fg_color = "#D8DEE9"
|
||
self.accent_color = "#5E81AC"
|
||
self.warning_color = "#BF616A"
|
||
self.success_color = "#A3BE8C"
|
||
|
||
# Device and measurement state
|
||
self.session_active = False
|
||
self.measuring = False
|
||
self.test_running = False
|
||
self.continuous_mode = False
|
||
self.request_stop = False
|
||
self.interval = 0.1
|
||
self.log_dir = os.path.expanduser("~/adalm1000/logs")
|
||
os.makedirs(self.log_dir, exist_ok=True)
|
||
|
||
# Data buffers
|
||
self.time_data = deque()
|
||
self.voltage_data = deque()
|
||
self.current_data = deque()
|
||
self.phase_data = deque()
|
||
|
||
# Initialize UI and device
|
||
self.setup_ui()
|
||
self.init_device()
|
||
|
||
# Set window properties
|
||
self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)")
|
||
self.resize(1000, 800)
|
||
self.setMinimumSize(800, 700)
|
||
|
||
# Status update timer
|
||
self.status_timer = QTimer()
|
||
self.status_timer.timeout.connect(self.update_status)
|
||
self.status_timer.start(1000) # Update every second
|
||
|
||
def setup_ui(self):
|
||
"""Configure the user interface"""
|
||
# Main widget and layout
|
||
self.central_widget = QWidget()
|
||
self.setCentralWidget(self.central_widget)
|
||
self.main_layout = QVBoxLayout(self.central_widget)
|
||
self.main_layout.setContentsMargins(10, 10, 10, 10)
|
||
|
||
# Header area
|
||
header_frame = QFrame()
|
||
header_frame.setFrameShape(QFrame.NoFrame)
|
||
header_layout = QHBoxLayout(header_frame)
|
||
header_layout.setContentsMargins(0, 0, 0, 0)
|
||
|
||
self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)")
|
||
self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};")
|
||
header_layout.addWidget(self.title_label, 1)
|
||
|
||
# Status indicator
|
||
self.status_light = QLabel()
|
||
self.status_light.setFixedSize(20, 20)
|
||
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
|
||
header_layout.addWidget(self.status_light)
|
||
|
||
self.connection_label = QLabel("Disconnected")
|
||
header_layout.addWidget(self.connection_label)
|
||
|
||
# Reconnect button
|
||
self.reconnect_btn = QPushButton("Reconnect")
|
||
self.reconnect_btn.clicked.connect(self.reconnect_device)
|
||
header_layout.addWidget(self.reconnect_btn)
|
||
|
||
self.main_layout.addWidget(header_frame)
|
||
|
||
# Measurement display
|
||
display_frame = QFrame()
|
||
display_frame.setFrameShape(QFrame.StyledPanel)
|
||
display_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
|
||
display_layout = QGridLayout(display_frame)
|
||
|
||
# Measurement values
|
||
measurement_labels = [
|
||
("Voltage", "V"), ("Current", "A"), ("Test Phase", ""),
|
||
("Elapsed Time", "s"), ("Discharge Capacity", "Ah"), ("Charge Capacity", "Ah"),
|
||
("Coulomb Eff.", "%"), ("Cycle Count", ""), ("Battery Temp", "°C"),
|
||
("Internal R", "Ω"), ("Power", "W"), ("Energy", "Wh")
|
||
]
|
||
|
||
# 4 Zeilen × 3 Spalten Anordnung
|
||
for i, (label, unit) in enumerate(measurement_labels):
|
||
row = i // 3 # 0-3 (4 Zeilen)
|
||
col = (i % 3) * 3 # 0, 3, 6 (3 Spalten mit je 3 Widgets)
|
||
|
||
# Label für den Messwertnamen
|
||
lbl = QLabel(f"{label}:")
|
||
lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
|
||
display_layout.addWidget(lbl, row, col)
|
||
|
||
# Label für den Messwert
|
||
value_lbl = QLabel("0.000")
|
||
value_lbl.setStyleSheet(f"""
|
||
color: {self.fg_color};
|
||
font-weight: bold;
|
||
font-size: 12px;
|
||
min-width: 60px;
|
||
""")
|
||
display_layout.addWidget(value_lbl, row, col + 1)
|
||
|
||
# Einheit falls vorhanden
|
||
if unit:
|
||
unit_lbl = QLabel(unit)
|
||
unit_lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
|
||
display_layout.addWidget(unit_lbl, row, col + 2)
|
||
|
||
# Spaltenabstände anpassen
|
||
for i in range(9): # 3 Spalten × 3 Widgets
|
||
display_layout.setColumnStretch(i, 1 if i % 3 == 1 else 0) # Nur Wert-Spalten dehnen
|
||
|
||
# Referenzen aktualisieren
|
||
self.voltage_label = display_layout.itemAtPosition(0, 1).widget()
|
||
self.current_label = display_layout.itemAtPosition(0, 4).widget()
|
||
self.phase_label = display_layout.itemAtPosition(0, 7).widget()
|
||
self.time_label = display_layout.itemAtPosition(1, 1).widget()
|
||
self.capacity_label = display_layout.itemAtPosition(1, 4).widget()
|
||
self.charge_capacity_label = display_layout.itemAtPosition(1, 7).widget()
|
||
self.efficiency_label = display_layout.itemAtPosition(2, 1).widget()
|
||
self.cycle_label = display_layout.itemAtPosition(2, 4).widget()
|
||
self.temp_label = display_layout.itemAtPosition(2, 7).widget()
|
||
self.resistance_label = display_layout.itemAtPosition(3, 1).widget()
|
||
self.power_label = display_layout.itemAtPosition(3, 4).widget()
|
||
self.energy_label = display_layout.itemAtPosition(3, 7).widget()
|
||
|
||
self.main_layout.addWidget(display_frame)
|
||
|
||
# Control area
|
||
controls_frame = QFrame()
|
||
controls_frame.setFrameShape(QFrame.NoFrame)
|
||
controls_layout = QHBoxLayout(controls_frame)
|
||
controls_layout.setContentsMargins(0, 0, 0, 0)
|
||
|
||
# Parameters frame
|
||
params_frame = QFrame()
|
||
params_frame.setFrameShape(QFrame.StyledPanel)
|
||
params_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
|
||
params_layout = QGridLayout(params_frame)
|
||
|
||
# Battery capacity
|
||
self.capacity = 0.2
|
||
self.capacity_label_input = QLabel("Battery Capacity (Ah):")
|
||
self.capacity_label_input.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.capacity_label_input, 0, 0)
|
||
self.capacity_input = QLineEdit("0.2")
|
||
self.capacity_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.capacity_input.setFixedWidth(60)
|
||
params_layout.addWidget(self.capacity_input, 0, 1)
|
||
|
||
# Charge cutoff
|
||
self.charge_cutoff = 1.43
|
||
self.charge_cutoff_label = QLabel("Charge Cutoff (V):")
|
||
self.charge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.charge_cutoff_label, 1, 0)
|
||
self.charge_cutoff_input = QLineEdit("1.43")
|
||
self.charge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.charge_cutoff_input.setFixedWidth(60)
|
||
params_layout.addWidget(self.charge_cutoff_input, 1, 1)
|
||
|
||
# Discharge cutoff
|
||
self.discharge_cutoff = 0.9
|
||
self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):")
|
||
self.discharge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.discharge_cutoff_label, 2, 0)
|
||
self.discharge_cutoff_input = QLineEdit("0.9")
|
||
self.discharge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.discharge_cutoff_input.setFixedWidth(60)
|
||
params_layout.addWidget(self.discharge_cutoff_input, 2, 1)
|
||
|
||
# Rest time
|
||
self.rest_time = 0.25
|
||
self.rest_time_label = QLabel("Rest Time (hours):")
|
||
self.rest_time_label.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.rest_time_label, 3, 0)
|
||
self.rest_time_input = QLineEdit("0.25")
|
||
self.rest_time_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.rest_time_input.setFixedWidth(60)
|
||
params_layout.addWidget(self.rest_time_input, 3, 1)
|
||
|
||
# C-rate for test
|
||
self.c_rate = 0.1
|
||
self.c_rate_label = QLabel("Test C-rate:")
|
||
self.c_rate_label.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.c_rate_label, 0, 2)
|
||
self.c_rate_input = QLineEdit("0.1")
|
||
self.c_rate_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.c_rate_input.setFixedWidth(40)
|
||
params_layout.addWidget(self.c_rate_input, 0, 3)
|
||
|
||
c_rate_note = QLabel("(e.g., 0.2 for C/5)")
|
||
c_rate_note.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(c_rate_note, 0, 4)
|
||
|
||
controls_layout.addWidget(params_frame, 1)
|
||
|
||
# Test conditions input
|
||
self.test_conditions_label = QLabel("Test Conditions/Chemistry:")
|
||
self.test_conditions_label.setStyleSheet(f"color: {self.fg_color};")
|
||
params_layout.addWidget(self.test_conditions_label, 4, 0)
|
||
self.test_conditions_input = QLineEdit("")
|
||
self.test_conditions_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
||
self.test_conditions_input.setFixedWidth(120)
|
||
params_layout.addWidget(self.test_conditions_input, 4, 1)
|
||
|
||
# Button frame
|
||
button_frame = QFrame()
|
||
button_frame.setFrameShape(QFrame.NoFrame)
|
||
button_layout = QVBoxLayout(button_frame)
|
||
button_layout.setContentsMargins(0, 0, 0, 0)
|
||
|
||
self.start_button = QPushButton("START TEST")
|
||
self.start_button.setStyleSheet(f"""
|
||
QPushButton {{
|
||
background-color: {self.accent_color};
|
||
color: {self.fg_color};
|
||
font-weight: bold;
|
||
padding: 6px;
|
||
border-radius: 4px;
|
||
}}
|
||
QPushButton:disabled {{
|
||
background-color: #4C566A;
|
||
color: #D8DEE9;
|
||
}}
|
||
""")
|
||
self.start_button.clicked.connect(self.start_test)
|
||
button_layout.addWidget(self.start_button)
|
||
|
||
self.stop_button = QPushButton("STOP TEST")
|
||
self.stop_button.setStyleSheet(f"""
|
||
QPushButton {{
|
||
background-color: {self.warning_color};
|
||
color: {self.fg_color};
|
||
font-weight: bold;
|
||
padding: 6px;
|
||
border-radius: 4px;
|
||
}}
|
||
QPushButton:disabled {{
|
||
background-color: #4C566A;
|
||
color: #D8DEE9;
|
||
}}
|
||
""")
|
||
self.stop_button.clicked.connect(self.stop_test)
|
||
self.stop_button.setEnabled(False)
|
||
button_layout.addWidget(self.stop_button)
|
||
|
||
# Continuous mode checkbox
|
||
self.continuous_mode_check = QCheckBox("Continuous Mode")
|
||
self.continuous_mode_check.setChecked(True)
|
||
self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")
|
||
button_layout.addWidget(self.continuous_mode_check)
|
||
|
||
controls_layout.addWidget(button_frame)
|
||
self.main_layout.addWidget(controls_frame)
|
||
|
||
# Plot area
|
||
self.setup_plot()
|
||
|
||
# Status bar
|
||
self.status_bar = self.statusBar()
|
||
self.status_bar.setStyleSheet(f"color: {self.fg_color};")
|
||
self.status_bar.showMessage("Ready")
|
||
|
||
# Apply dark theme
|
||
self.setStyleSheet(f"""
|
||
QMainWindow {{
|
||
background-color: {self.bg_color};
|
||
}}
|
||
QLabel {{
|
||
color: {self.fg_color};
|
||
}}
|
||
QLineEdit {{
|
||
background-color: #3B4252;
|
||
color: {self.fg_color};
|
||
border: 1px solid #4C566A;
|
||
border-radius: 3px;
|
||
padding: 2px;
|
||
}}
|
||
""")
|
||
|
||
def setup_plot(self):
|
||
"""Configure the matplotlib plot"""
|
||
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color)
|
||
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
|
||
self.ax = self.fig.add_subplot(111)
|
||
self.ax.set_facecolor('#3B4252')
|
||
|
||
# Set initial voltage range
|
||
voltage_padding = 0.2
|
||
min_voltage = max(0, 0.9 - voltage_padding)
|
||
max_voltage = 1.43 + voltage_padding
|
||
self.ax.set_ylim(min_voltage, max_voltage)
|
||
|
||
# Voltage plot
|
||
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
|
||
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
|
||
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
|
||
|
||
# Current plot (right axis)
|
||
self.ax2 = self.ax.twinx()
|
||
current_padding = 0.05
|
||
test_current = 0.1 * 0.2 # Default values
|
||
max_current = test_current * 1.5
|
||
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
|
||
|
||
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
|
||
self.ax2.set_ylabel("Current (A)", color='r')
|
||
self.ax2.tick_params(axis='y', labelcolor='r')
|
||
|
||
self.ax.set_xlabel('Time (s)', color=self.fg_color)
|
||
self.ax.set_title('Battery Test (CC)', color=self.fg_color)
|
||
self.ax.tick_params(axis='x', colors=self.fg_color)
|
||
self.ax.grid(True, color='#4C566A')
|
||
|
||
# Position legends
|
||
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
|
||
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
|
||
|
||
# Embed plot
|
||
self.canvas = FigureCanvas(self.fig)
|
||
self.canvas.setStyleSheet(f"background-color: {self.bg_color};")
|
||
self.main_layout.addWidget(self.canvas, 1)
|
||
|
||
def init_device(self):
|
||
"""Initialize the ADALM1000 device with continuous measurement"""
|
||
try:
|
||
# Clean up any existing session
|
||
if hasattr(self, 'session'):
|
||
try:
|
||
self.session.end()
|
||
del self.session
|
||
except:
|
||
pass
|
||
|
||
time.sleep(1)
|
||
|
||
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
|
||
if not self.session.devices:
|
||
raise Exception("No ADALM1000 detected - check connections")
|
||
|
||
self.dev = self.session.devices[0]
|
||
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
||
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
||
self.dev.channels['A'].constant(0)
|
||
self.dev.channels['B'].constant(0)
|
||
|
||
self.session.start(0)
|
||
|
||
self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;")
|
||
self.connection_label.setText("Connected")
|
||
self.status_bar.showMessage("Device connected | Ready to measure")
|
||
self.session_active = True
|
||
self.start_button.setEnabled(True)
|
||
|
||
# Start measurement thread
|
||
self.measurement_thread = MeasurementThread(self.dev, self.interval)
|
||
self.measurement_thread.update_signal.connect(self.update_measurements)
|
||
self.measurement_thread.error_signal.connect(self.handle_device_error)
|
||
|
||
# Start the QThread directly (no need for threading.Thread)
|
||
self.measurement_thread.start()
|
||
|
||
except Exception as e:
|
||
self.handle_device_error(str(e))
|
||
|
||
@pyqtSlot(float, float, float)
|
||
def update_measurements(self, voltage, current, current_time):
|
||
"""Update measurements from the measurement thread"""
|
||
self.time_data.append(current_time)
|
||
self.voltage_data.append(voltage)
|
||
self.current_data.append(current)
|
||
|
||
# Update display
|
||
self.voltage_label.setText(f"{voltage:.4f}")
|
||
self.current_label.setText(f"{current:.4f}")
|
||
self.time_label.setText(self.format_time(current_time))
|
||
|
||
# Throttle plot updates to avoid recursive repaint
|
||
now = time.time()
|
||
if not hasattr(self, '_last_plot_update'):
|
||
self._last_plot_update = 0
|
||
|
||
if now - self._last_plot_update > 0.1: # Update plot max 10 times per second
|
||
self._last_plot_update = now
|
||
QTimer.singleShot(0, self.update_plot)
|
||
|
||
def update_status(self):
|
||
"""Update status information periodically"""
|
||
if self.test_running:
|
||
# Update capacity calculations if in test mode
|
||
if self.measuring and self.time_data:
|
||
current_time = time.time() - self.start_time
|
||
delta_t = current_time - self.last_update_time
|
||
self.last_update_time = current_time
|
||
|
||
if self.test_phase == "Discharge":
|
||
current_current = abs(self.current_data[-1])
|
||
self.capacity_ah += current_current * delta_t / 3600
|
||
self.capacity_label.setText(f"{self.capacity_ah:.4f}")
|
||
elif self.test_phase == "Charge":
|
||
current_current = abs(self.current_data[-1])
|
||
self.charge_capacity += current_current * delta_t / 3600
|
||
self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}")
|
||
|
||
def start_test(self):
|
||
"""Start the full battery test cycle"""
|
||
# Clean up any previous test
|
||
if hasattr(self, 'test_sequence_thread'):
|
||
self.test_sequence_thread.quit()
|
||
self.test_sequence_thread.wait(500)
|
||
if hasattr(self, 'test_sequence_worker'):
|
||
self.test_sequence_worker.deleteLater()
|
||
del self.test_sequence_thread
|
||
|
||
# Reset stop flag
|
||
self.request_stop = False
|
||
|
||
if not self.test_running:
|
||
try:
|
||
# Get parameters from UI
|
||
self.capacity = float(self.capacity_input.text())
|
||
self.charge_cutoff = float(self.charge_cutoff_input.text())
|
||
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
|
||
self.rest_time = float(self.rest_time_input.text())
|
||
self.c_rate = float(self.c_rate_input.text())
|
||
|
||
# Validate inputs
|
||
if self.capacity <= 0:
|
||
raise ValueError("Battery capacity must be positive")
|
||
if self.charge_cutoff <= self.discharge_cutoff:
|
||
raise ValueError("Charge cutoff must be higher than discharge cutoff")
|
||
if self.c_rate <= 0:
|
||
raise ValueError("C-rate must be positive")
|
||
|
||
test_current = self.c_rate * self.capacity
|
||
if test_current > 0.2:
|
||
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
|
||
|
||
# Clear previous data
|
||
self.time_data.clear()
|
||
self.voltage_data.clear()
|
||
self.current_data.clear()
|
||
self.phase_data.clear()
|
||
self.start_time = time.time()
|
||
self.last_update_time = self.start_time
|
||
self.capacity_ah = 0.0
|
||
self.charge_capacity = 0.0
|
||
self.coulomb_efficiency = 0.0
|
||
self.cycle_count = 0
|
||
|
||
if hasattr(self, 'measurement_thread'):
|
||
self.measurement_thread.start_time = time.time()
|
||
|
||
# Reset plot with proper ranges
|
||
self.reset_plot()
|
||
|
||
# Generate filename and create log file
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}")
|
||
self.create_cycle_log_file()
|
||
|
||
# Start test
|
||
self.test_running = True
|
||
self.start_time = time.time()
|
||
self.last_update_time = time.time()
|
||
self.test_phase = "Initial Discharge"
|
||
self.phase_label.setText(self.test_phase)
|
||
|
||
self.start_button.setEnabled(False)
|
||
self.stop_button.setEnabled(True)
|
||
self.status_bar.showMessage(f"Test started | Discharging to {self.discharge_cutoff}V @ {test_current:.3f}A")
|
||
|
||
# Start test sequence in a QThread
|
||
self.test_sequence_thread = QThread()
|
||
self.test_sequence_worker = TestSequenceWorker(
|
||
self.dev,
|
||
test_current,
|
||
self.charge_cutoff,
|
||
self.discharge_cutoff,
|
||
self.rest_time,
|
||
self.continuous_mode_check.isChecked(),
|
||
self # Pass reference to main window for callbacks
|
||
)
|
||
self.test_sequence_worker.moveToThread(self.test_sequence_thread)
|
||
|
||
# Connect signals
|
||
self.test_sequence_worker.update_phase.connect(self.update_test_phase)
|
||
self.test_sequence_worker.update_status.connect(self.status_bar.showMessage)
|
||
self.test_sequence_worker.test_completed.connect(self.finalize_test)
|
||
self.test_sequence_worker.error_occurred.connect(self.handle_test_error)
|
||
self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit)
|
||
self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater)
|
||
self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater)
|
||
|
||
# Start the thread and the worker's run method
|
||
self.test_sequence_thread.start()
|
||
QTimer.singleShot(0, self.test_sequence_worker.run)
|
||
|
||
# Start capacity calculation timer if not already running
|
||
if not self.status_timer.isActive():
|
||
self.status_timer.start(1000)
|
||
|
||
except Exception as e:
|
||
QMessageBox.critical(self, "Error", str(e))
|
||
# Ensure buttons are in correct state if error occurs
|
||
self.start_button.setEnabled(True)
|
||
self.stop_button.setEnabled(False)
|
||
|
||
def create_cycle_log_file(self):
|
||
"""Create a new log file for the current cycle"""
|
||
try:
|
||
# Close previous file if exists
|
||
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
|
||
try:
|
||
self.current_cycle_file.close()
|
||
except Exception as e:
|
||
print(f"Error closing previous log file: {e}")
|
||
|
||
# Ensure log directory exists
|
||
os.makedirs(self.log_dir, exist_ok=True)
|
||
|
||
if not os.access(self.log_dir, os.W_OK):
|
||
QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}")
|
||
return False
|
||
|
||
# Generate unique filename
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
self.filename = os.path.join(self.log_dir, f"battery_test_{timestamp}.csv")
|
||
|
||
# Open new file
|
||
try:
|
||
self.current_cycle_file = open(self.filename, 'w', newline='')
|
||
|
||
# Write header with test parameters
|
||
test_current = self.c_rate * self.capacity
|
||
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
|
||
|
||
self.current_cycle_file.write(f"# ADALM1000 Battery Test Log\n")
|
||
self.current_cycle_file.write(f"# Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||
self.current_cycle_file.write(f"# Battery Capacity: {self.capacity} Ah\n")
|
||
self.current_cycle_file.write(f"# Test Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n")
|
||
self.current_cycle_file.write(f"# Charge Cutoff: {self.charge_cutoff} V\n")
|
||
self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n")
|
||
self.current_cycle_file.write(f"# Rest Time: {self.rest_time} hours\n")
|
||
self.current_cycle_file.write(f"# Test Conditions/Chemistry: {test_conditions}\n")
|
||
self.current_cycle_file.write("#\n")
|
||
|
||
# Write data header
|
||
self.log_writer = csv.writer(self.current_cycle_file)
|
||
self.log_writer.writerow([
|
||
"Time(s)", "Voltage(V)", "Current(A)", "Phase",
|
||
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
|
||
"Coulomb_Eff(%)", "Cycle"
|
||
])
|
||
self.log_buffer = []
|
||
return True
|
||
except Exception as e:
|
||
QMessageBox.critical(self, "Error", f"Failed to create log file: {e}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"Error in create_cycle_log_file: {e}")
|
||
return False
|
||
|
||
def format_time(self, seconds):
|
||
"""Convert seconds to hh:mm:ss format"""
|
||
hours = int(seconds // 3600)
|
||
minutes = int((seconds % 3600) // 60)
|
||
seconds = int(seconds % 60)
|
||
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
||
|
||
def stop_test(self):
|
||
"""Request immediate stop of the test"""
|
||
if not self.test_running:
|
||
return
|
||
|
||
self.request_stop = True
|
||
self.test_running = False
|
||
self.measuring = False
|
||
self.test_phase = "Idle"
|
||
self.phase_label.setText(self.test_phase)
|
||
|
||
# Stop test sequence worker if it exists and is not already deleted
|
||
if hasattr(self, 'test_sequence_worker'):
|
||
try:
|
||
if not sip.isdeleted(self.test_sequence_worker):
|
||
self.test_sequence_worker.stop()
|
||
except:
|
||
pass
|
||
|
||
# Reset device channels
|
||
if hasattr(self, 'dev'):
|
||
try:
|
||
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
||
self.dev.channels['A'].constant(0)
|
||
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
||
except Exception as e:
|
||
print(f"Error resetting device: {e}")
|
||
|
||
# Clear all data buffers
|
||
self.time_data.clear()
|
||
self.voltage_data.clear()
|
||
self.current_data.clear()
|
||
self.phase_data.clear()
|
||
|
||
# Reset capacities
|
||
self.capacity_ah = 0.0
|
||
self.charge_capacity = 0.0
|
||
self.coulomb_efficiency = 0.0
|
||
|
||
# Reset plot
|
||
self.reset_plot()
|
||
|
||
# Reset elapsed time label
|
||
self.time_label.setText("00:00:00")
|
||
|
||
# Update UI
|
||
self.status_bar.showMessage("Test stopped - Ready for new test")
|
||
self.stop_button.setEnabled(False)
|
||
self.start_button.setEnabled(True)
|
||
|
||
def finalize_test(self):
|
||
"""Final cleanup after test completes or is stopped"""
|
||
try:
|
||
# 1. Stop any active measurement or test operations
|
||
self.measuring = False
|
||
self.test_running = False
|
||
|
||
# 2. Reset device to safe state
|
||
if hasattr(self, 'dev'):
|
||
try:
|
||
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
||
self.dev.channels['A'].constant(0)
|
||
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
||
except Exception as e:
|
||
print(f"Error resetting device in finalize: {e}")
|
||
|
||
# 3. Clean up test sequence thread safely
|
||
if hasattr(self, 'test_sequence_thread'):
|
||
try:
|
||
# Check if thread is still running
|
||
if self.test_sequence_thread.isRunning():
|
||
# First try to stop the worker if it exists
|
||
if hasattr(self, 'test_sequence_worker'):
|
||
try:
|
||
self.test_sequence_worker.stop()
|
||
except RuntimeError:
|
||
pass # Already deleted
|
||
|
||
# Quit the thread
|
||
self.test_sequence_thread.quit()
|
||
self.test_sequence_thread.wait(500)
|
||
except RuntimeError:
|
||
pass # Already deleted
|
||
except Exception as e:
|
||
print(f"Error stopping test sequence thread: {e}")
|
||
finally:
|
||
# Only try to delete if the object still exists
|
||
if hasattr(self, 'test_sequence_worker'):
|
||
try:
|
||
if not sip.isdeleted(self.test_sequence_worker):
|
||
self.test_sequence_worker.deleteLater()
|
||
except:
|
||
pass
|
||
|
||
# Remove references
|
||
if hasattr(self, 'test_sequence_thread'):
|
||
try:
|
||
if not sip.isdeleted(self.test_sequence_thread):
|
||
self.test_sequence_thread.deleteLater()
|
||
except:
|
||
pass
|
||
finally:
|
||
if hasattr(self, 'test_sequence_thread'):
|
||
del self.test_sequence_thread
|
||
|
||
# 4. Finalize log file
|
||
test_current = self.c_rate * self.capacity
|
||
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
|
||
|
||
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
|
||
try:
|
||
# Write any buffered data
|
||
if hasattr(self, 'log_buffer') and self.log_buffer:
|
||
self.log_writer.writerows(self.log_buffer)
|
||
self.log_buffer.clear()
|
||
|
||
# Write test summary
|
||
self.current_cycle_file.write("\n# TEST SUMMARY\n")
|
||
self.current_cycle_file.write(f"# Test Parameters:\n")
|
||
self.current_cycle_file.write(f"# - Battery Capacity: {self.capacity} Ah\n")
|
||
self.current_cycle_file.write(f"# - Test Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n")
|
||
self.current_cycle_file.write(f"# - Charge Cutoff: {self.charge_cutoff} V\n")
|
||
self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n")
|
||
self.current_cycle_file.write(f"# - Test Conditions: {test_conditions}\n")
|
||
self.current_cycle_file.write(f"# Results:\n")
|
||
self.current_cycle_file.write(f"# - Cycles Completed: {self.cycle_count}\n")
|
||
self.current_cycle_file.write(f"# - Final Discharge Capacity: {self.capacity_ah:.4f} Ah\n")
|
||
self.current_cycle_file.write(f"# - Final Charge Capacity: {self.charge_capacity:.4f} Ah\n")
|
||
self.current_cycle_file.write(f"# - Coulombic Efficiency: {self.coulomb_efficiency:.1f}%\n")
|
||
|
||
self.current_cycle_file.close()
|
||
except Exception as e:
|
||
print(f"Error closing log file: {e}")
|
||
finally:
|
||
self.current_cycle_file = None
|
||
|
||
# 5. Reset UI and state
|
||
self.request_stop = False
|
||
self.start_button.setEnabled(True)
|
||
self.stop_button.setEnabled(False)
|
||
|
||
# 6. Show completion message if test wasn't stopped by user
|
||
if not self.request_stop:
|
||
message = (
|
||
f"Test completed | "
|
||
f"Cycle {self.cycle_count} | "
|
||
f"Capacity: {self.capacity_ah:.3f}Ah | "
|
||
f"Efficiency: {self.coulomb_efficiency:.1f}%"
|
||
)
|
||
self.status_bar.showMessage(message)
|
||
|
||
QMessageBox.information(
|
||
self,
|
||
"Test Completed",
|
||
f"Test completed successfully.\n\n"
|
||
f"Test Parameters:\n"
|
||
f"- Capacity: {self.capacity} Ah\n"
|
||
f"- Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n"
|
||
f"- Charge Cutoff: {self.charge_cutoff} V\n"
|
||
f"- Discharge Cutoff: {self.discharge_cutoff} V\n"
|
||
f"- Conditions: {test_conditions}\n\n"
|
||
f"Results:\n"
|
||
f"- Cycles: {self.cycle_count}\n"
|
||
f"- Discharge capacity: {self.capacity_ah:.3f}Ah\n"
|
||
f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%"
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f"Error in finalize_test: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
# Ensure we don't leave the UI in a locked state
|
||
self.start_button.setEnabled(True)
|
||
self.stop_button.setEnabled(False)
|
||
self.status_bar.showMessage("Error during test finalization")
|
||
|
||
def reset_plot(self):
|
||
"""Reset the plot completely for a new test"""
|
||
self.line_voltage.set_data([], [])
|
||
self.line_current.set_data([], [])
|
||
|
||
self.time_data.clear()
|
||
self.voltage_data.clear()
|
||
self.current_data.clear()
|
||
|
||
voltage_padding = 0.2
|
||
min_voltage = max(0, self.discharge_cutoff - voltage_padding)
|
||
max_voltage = self.charge_cutoff + voltage_padding
|
||
|
||
# Reset X and Y axes
|
||
self.ax.set_xlim(0, 10) # Start at 0 with 10s initial window
|
||
self.ax.set_ylim(min_voltage, max_voltage)
|
||
self.ax2.set_xlim(0, 10) # Sync twin axis
|
||
|
||
current_padding = 0.05
|
||
test_current = self.c_rate * self.capacity
|
||
max_current = test_current * 1.5
|
||
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
|
||
|
||
self.canvas.draw()
|
||
|
||
def write_cycle_summary(self):
|
||
"""Write cycle summary to the current cycle's log file"""
|
||
if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file:
|
||
return
|
||
|
||
summary_line = (
|
||
f"Cycle {self.cycle_count} Summary - "
|
||
f"Discharge={self.capacity_ah:.4f}Ah, "
|
||
f"Charge={self.charge_capacity:.4f}Ah, "
|
||
f"Efficiency={self.coulomb_efficiency:.1f}%"
|
||
)
|
||
|
||
try:
|
||
if self.log_buffer:
|
||
self.log_writer.writerows(self.log_buffer)
|
||
self.log_buffer.clear()
|
||
self.current_cycle_file.write(summary_line + "\n")
|
||
self.current_cycle_file.flush()
|
||
except Exception as e:
|
||
print(f"Error writing cycle summary: {e}")
|
||
|
||
def update_plot(self):
|
||
"""More reliable plotting with better error handling"""
|
||
try:
|
||
# Create local copies safely
|
||
with self.plot_mutex:
|
||
if not self.time_data or not self.voltage_data or not self.current_data:
|
||
return
|
||
|
||
if len(self.time_data) != len(self.voltage_data) or len(self.time_data) != len(self.current_data):
|
||
# Find the minimum length to avoid mismatch
|
||
min_len = min(len(self.time_data), len(self.voltage_data), len(self.current_data))
|
||
x_data = np.array(self.time_data[-min_len:])
|
||
y1_data = np.array(self.voltage_data[-min_len:])
|
||
y2_data = np.array(self.current_data[-min_len:])
|
||
else:
|
||
x_data = np.array(self.time_data)
|
||
y1_data = np.array(self.voltage_data)
|
||
y2_data = np.array(self.current_data)
|
||
|
||
# Update plot data
|
||
self.line_voltage.set_data(x_data, y1_data)
|
||
self.line_current.set_data(x_data, y2_data)
|
||
|
||
# Auto-scale when needed
|
||
if len(x_data) > 0 and x_data[-1] > self.ax.get_xlim()[1] * 0.8:
|
||
self.auto_scale_axes()
|
||
|
||
# Force redraw
|
||
self.canvas.draw_idle()
|
||
|
||
except Exception as e:
|
||
print(f"Plot error: {e}")
|
||
# Reset plot on error
|
||
self.line_voltage.set_data([], [])
|
||
self.line_current.set_data([], [])
|
||
self.canvas.draw_idle()
|
||
|
||
def auto_scale_axes(self):
|
||
"""Auto-scale plot axes with appropriate padding and strict boundaries"""
|
||
if not self.time_data:
|
||
return
|
||
|
||
min_time = 0
|
||
max_time = self.time_data[-1]
|
||
current_xlim = self.ax.get_xlim()
|
||
|
||
if max_time > current_xlim[1] * 0.95:
|
||
new_max = max_time * 1.05
|
||
self.ax.set_xlim(min_time, new_max)
|
||
self.ax2.set_xlim(min_time, new_max)
|
||
|
||
voltage_padding = 0.2
|
||
if self.voltage_data:
|
||
min_voltage = max(0, min(self.voltage_data) - voltage_padding)
|
||
max_voltage = min(5.0, max(self.voltage_data) + voltage_padding)
|
||
current_ylim = self.ax.get_ylim()
|
||
if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1):
|
||
self.ax.set_ylim(min_voltage, max_voltage)
|
||
|
||
current_padding = 0.05
|
||
if self.current_data:
|
||
min_current = max(-0.25, min(self.current_data) - current_padding)
|
||
max_current = min(0.25, max(self.current_data) + current_padding)
|
||
current_ylim2 = self.ax2.get_ylim()
|
||
if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02):
|
||
self.ax2.set_ylim(min_current, max_current)
|
||
|
||
@pyqtSlot(str)
|
||
def handle_device_error(self, error):
|
||
"""Handle device connection errors"""
|
||
error_msg = str(error)
|
||
print(f"Device error: {error_msg}")
|
||
|
||
if hasattr(self, 'session'):
|
||
try:
|
||
if self.session_active:
|
||
self.session.end()
|
||
del self.session
|
||
except Exception as e:
|
||
print(f"Error cleaning up session: {e}")
|
||
|
||
self.status_light.setStyleSheet(f"background-color: red; border-radius: 10px;")
|
||
self.connection_label.setText("Disconnected")
|
||
self.status_bar.showMessage(f"Device error: {error_msg}")
|
||
|
||
self.session_active = False
|
||
self.test_running = False
|
||
self.continuous_mode = False
|
||
self.measuring = False
|
||
|
||
self.start_button.setEnabled(False)
|
||
self.stop_button.setEnabled(False)
|
||
|
||
self.time_data.clear()
|
||
self.voltage_data.clear()
|
||
self.current_data.clear()
|
||
|
||
@pyqtSlot(str)
|
||
def update_test_phase(self, phase_text):
|
||
"""Update the test phase display"""
|
||
self.test_phase = phase_text
|
||
self.phase_label.setText(phase_text)
|
||
|
||
# Update log if available
|
||
if hasattr(self, 'log_buffer'):
|
||
current_time = time.time() - self.start_time
|
||
self.log_buffer.append([
|
||
f"{current_time:.3f}",
|
||
"",
|
||
"",
|
||
phase_text,
|
||
f"{self.capacity_ah:.4f}",
|
||
f"{self.charge_capacity:.4f}",
|
||
f"{self.coulomb_efficiency:.1f}" if hasattr(self, 'coulomb_efficiency') else "0.0",
|
||
f"{self.cycle_count}"
|
||
])
|
||
|
||
@pyqtSlot(str)
|
||
def handle_test_error(self, error_msg):
|
||
"""Handle errors from the test sequence with complete cleanup"""
|
||
try:
|
||
# 1. Notify user
|
||
QMessageBox.critical(self, "Test Error",
|
||
f"An error occurred:\n{error_msg}\n\nAttempting to recover...")
|
||
|
||
# 2. Stop all operations
|
||
self.stop_test()
|
||
|
||
# 3. Reset UI elements
|
||
if hasattr(self, 'line_voltage'):
|
||
try:
|
||
self.line_voltage.set_data([], [])
|
||
self.line_current.set_data([], [])
|
||
self.ax.set_xlim(0, 1)
|
||
self.ax2.set_xlim(0, 1)
|
||
self.canvas.draw()
|
||
except Exception as plot_error:
|
||
print(f"Plot reset error: {plot_error}")
|
||
|
||
# 4. Update status
|
||
self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...")
|
||
self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;")
|
||
|
||
# 5. Attempt recovery
|
||
QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect
|
||
|
||
except Exception as e:
|
||
print(f"Error in error handler: {e}")
|
||
# Fallback - restart application?
|
||
QMessageBox.critical(self, "Fatal Error",
|
||
"The application needs to restart due to an unrecoverable error")
|
||
QTimer.singleShot(1000, self.close)
|
||
|
||
def attempt_reconnect(self):
|
||
"""Attempt to reconnect automatically"""
|
||
QMessageBox.critical(
|
||
self,
|
||
"Device Connection Error",
|
||
"Could not connect to ADALM1000\n\n"
|
||
"1. Check USB cable connection\n"
|
||
"2. The device will attempt to reconnect automatically"
|
||
)
|
||
|
||
QTimer.singleShot(1000, self.reconnect_device)
|
||
|
||
def reconnect_device(self):
|
||
"""Reconnect the device with proper cleanup"""
|
||
self.status_bar.showMessage("Attempting to reconnect...")
|
||
|
||
if hasattr(self, 'session'):
|
||
try:
|
||
if self.session_active:
|
||
self.session.end()
|
||
del self.session
|
||
except:
|
||
pass
|
||
|
||
self.test_running = False
|
||
self.continuous_mode = False
|
||
self.measuring = False
|
||
|
||
if hasattr(self, 'measurement_thread'):
|
||
self.measurement_thread.stop()
|
||
|
||
time.sleep(1.5)
|
||
|
||
try:
|
||
self.init_device()
|
||
if self.session_active:
|
||
self.status_bar.showMessage("Reconnected successfully")
|
||
return
|
||
except Exception as e:
|
||
print(f"Reconnect failed: {e}")
|
||
|
||
self.status_bar.showMessage("Reconnect failed - will retry...")
|
||
QTimer.singleShot(2000, self.reconnect_device)
|
||
|
||
def closeEvent(self, event):
|
||
"""Clean up on window close"""
|
||
self.test_running = False
|
||
self.measuring = False
|
||
self.session_active = False
|
||
|
||
# Stop measurement thread
|
||
if hasattr(self, 'measurement_thread'):
|
||
self.measurement_thread.stop()
|
||
|
||
# Stop test sequence thread
|
||
if hasattr(self, 'test_sequence_thread'):
|
||
if hasattr(self, 'test_sequence_worker'):
|
||
self.test_sequence_worker.stop()
|
||
self.test_sequence_thread.quit()
|
||
self.test_sequence_thread.wait(500)
|
||
|
||
# Clean up device session
|
||
if hasattr(self, 'session') and self.session:
|
||
try:
|
||
self.session.end()
|
||
except Exception as e:
|
||
print(f"Error ending session: {e}")
|
||
|
||
event.accept()
|
||
|
||
if __name__ == "__main__":
|
||
app = QApplication([])
|
||
try:
|
||
window = BatteryTester()
|
||
window.show()
|
||
app.exec_()
|
||
except Exception as e:
|
||
QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}") |