1280 lines
52 KiB
Python
1280 lines
52 KiB
Python
# -*- coding: utf-8 -*-
|
|
import os
|
|
import time
|
|
import csv
|
|
from datetime import datetime
|
|
import numpy as np
|
|
|
|
# Suppress warnings
|
|
os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false'
|
|
os.environ['LIBUSB_DEBUG'] = '0'
|
|
|
|
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
|
|
QGridLayout, QLabel, QPushButton, QLineEdit, QFrame,
|
|
QCheckBox, QMessageBox, QFileDialog, QProgressBar)
|
|
from PyQt5.QtCore import (Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread,
|
|
QMutex, QMutexLocker)
|
|
from PyQt5.QtGui import QColor, QPalette
|
|
from collections import deque
|
|
import pysmu
|
|
import matplotlib
|
|
matplotlib.use('Qt5Agg')
|
|
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
|
|
from matplotlib.figure import Figure
|
|
|
|
|
|
class DeviceDisconnectedError(Exception):
|
|
"""Custom exception for device disconnection events."""
|
|
pass
|
|
|
|
|
|
class MeasurementThread(QThread):
|
|
"""Thread for continuous measurement of voltage and current."""
|
|
|
|
update_signal = pyqtSignal(float, float, float) # voltage, current, timestamp
|
|
error_signal = pyqtSignal(str)
|
|
status_signal = pyqtSignal(str)
|
|
|
|
def __init__(self, device: object, interval: float = 0.1, start_time: float = None):
|
|
"""Initialize measurement thread."""
|
|
super().__init__()
|
|
self._mutex = QMutex()
|
|
self.data_mutex = QMutex()
|
|
self.device = device
|
|
self.interval = max(0.05, interval)
|
|
self._running = False
|
|
self.filter_window_size = 10
|
|
self.start_time = start_time if start_time else time.time()
|
|
self.last_update_time = self.start_time
|
|
|
|
# Configure channels
|
|
self.device.channels['A'].mode = pysmu.Mode.SIMV # Channel A for current
|
|
self.device.channels['B'].mode = pysmu.Mode.HI_Z # Channel B for voltage
|
|
|
|
def run(self):
|
|
"""Measurement loop for both voltage and current."""
|
|
self._running = True
|
|
voltage_window = deque()
|
|
current_window = deque()
|
|
self.status_signal.emit("Measurement started")
|
|
self.last_update_time = time.time()
|
|
|
|
while self._running:
|
|
try:
|
|
samples = self.device.read(self.filter_window_size, timeout=500)
|
|
if not samples or len(samples) < self.filter_window_size:
|
|
continue
|
|
|
|
with QMutexLocker(self._mutex):
|
|
# Get voltage from Channel B and current from Channel A
|
|
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
|
|
raw_current = np.mean([s[0][1] for s in samples]) # Channel A current
|
|
|
|
# Apply moving average filter
|
|
if len(voltage_window) >= self.filter_window_size:
|
|
voltage_window.popleft()
|
|
voltage_window.append(raw_voltage)
|
|
|
|
if len(current_window) >= self.filter_window_size:
|
|
current_window.popleft()
|
|
current_window.append(raw_current)
|
|
|
|
voltage = np.mean(voltage_window)
|
|
current = np.mean(current_window)
|
|
current_time = time.time() - self.start_time
|
|
|
|
self.update_signal.emit(voltage, current, current_time)
|
|
|
|
elapsed = time.time() - self.last_update_time
|
|
sleep_time = max(0.01, self.interval - elapsed)
|
|
time.sleep(sleep_time)
|
|
self.last_update_time = time.time()
|
|
|
|
except Exception as e:
|
|
self.error_signal.emit(f"Measurement error: {str(e)}")
|
|
break
|
|
|
|
self.status_signal.emit("Measurement stopped")
|
|
self._running = False
|
|
|
|
def stop(self):
|
|
"""Safe thread termination with timeout."""
|
|
self._running = False
|
|
if self.isRunning():
|
|
self.quit()
|
|
if not self.wait(300): # 300ms grace period
|
|
self.terminate()
|
|
|
|
def is_active(self) -> bool:
|
|
"""Check if thread is running and updating."""
|
|
with QMutexLocker(self._mutex):
|
|
return self._running and (time.time() - self.last_update_time < 2.0)
|
|
|
|
|
|
class TestSequenceThread(QThread):
|
|
"""Thread for executing battery test sequences."""
|
|
|
|
progress_updated = pyqtSignal(float, str) # progress, phase
|
|
cycle_completed = pyqtSignal(int, float, float, float) # cycle, discharge, charge, efficiency
|
|
error_occurred = pyqtSignal(str)
|
|
|
|
def __init__(self, parent: QObject):
|
|
"""Initialize test sequence thread."""
|
|
super().__init__(parent) # Pass parent to QThread
|
|
self.parent = parent
|
|
self._mutex = QMutex() # For thread operation control
|
|
self.data_mutex = QMutex() # For data protection
|
|
self._running = False
|
|
|
|
def run(self):
|
|
"""Execute the complete test sequence with configurable modes."""
|
|
self._running = True
|
|
|
|
try:
|
|
test_current = float(self.parent.c_rate_input.text()) * float(self.parent.capacity_input.text())
|
|
charge_cutoff = float(self.parent.charge_cutoff_input.text())
|
|
discharge_cutoff = float(self.parent.discharge_cutoff_input.text())
|
|
cv_cutoff_current = float(self.parent.cv_cutoff_input.text()) # Add this input field
|
|
|
|
while self._running and (self.parent.continuous_mode or self.parent.cycle_count == 0):
|
|
with QMutexLocker(self._mutex):
|
|
if self.parent.request_stop:
|
|
break
|
|
|
|
self.parent.cycle_count += 1
|
|
cycle = self.parent.cycle_count
|
|
|
|
# CC Charge phase
|
|
self._execute_phase("charge", test_current, charge_cutoff,
|
|
discharge_cutoff, charge_cutoff)
|
|
if not self._running:
|
|
break
|
|
|
|
# Optional CV Charge phase
|
|
if self.parent.cv_mode_enabled: # Add checkbox in UI
|
|
self._execute_cv_charge(charge_cutoff, test_current, cv_cutoff_current)
|
|
if not self._running:
|
|
break
|
|
|
|
# Rest after charge
|
|
self._execute_rest("post-charge")
|
|
if not self._running:
|
|
break
|
|
|
|
# Discharge phase (CC)
|
|
self._execute_phase("discharge", test_current, discharge_cutoff,
|
|
discharge_cutoff, charge_cutoff)
|
|
if not self.parent.continuous_mode:
|
|
break
|
|
|
|
# Rest after discharge
|
|
if self._running:
|
|
self._execute_rest("post-discharge")
|
|
|
|
# Calculate efficiency
|
|
if self.parent.charge_capacity > 0:
|
|
efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100
|
|
self.cycle_completed.emit(cycle, self.parent.capacity_ah,
|
|
self.parent.charge_capacity, efficiency)
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(str(e))
|
|
finally:
|
|
self._running = False
|
|
|
|
def _execute_cv_charge(self, target_voltage: float, current_limit: float, cutoff_current: float):
|
|
"""Execute constant voltage charge phase."""
|
|
try:
|
|
self.progress_updated.emit(0.0, "CV Charge")
|
|
|
|
# Configure for CV mode
|
|
self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV
|
|
self.parent.dev.channels['A'].constant(target_voltage)
|
|
|
|
start_time = time.time()
|
|
last_update = start_time
|
|
|
|
while self._running:
|
|
with QMutexLocker(self._mutex):
|
|
if self.parent.request_stop:
|
|
break
|
|
|
|
if not self.parent.test_data['voltage']:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
current_voltage = self.parent.test_data['voltage'][-1]
|
|
current_current = abs(self.parent.test_data['current'][-1])
|
|
current_time = time.time()
|
|
delta_t = current_time - last_update
|
|
last_update = current_time
|
|
|
|
# Update charge capacity
|
|
self.parent.charge_capacity += current_current * delta_t / 3600
|
|
|
|
# Calculate progress based on current
|
|
progress = 1 - (current_current / current_limit)
|
|
progress = max(0.0, min(1.0, progress))
|
|
self.progress_updated.emit(progress, "CV Charge")
|
|
|
|
# Check termination condition
|
|
if current_current <= cutoff_current:
|
|
break
|
|
|
|
time.sleep(0.1)
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"CV Charge error: {str(e)}")
|
|
raise
|
|
finally:
|
|
self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
|
|
def _execute_phase(self, phase: str, current: float, target_voltage: float, discharge_cutoff: float, charge_cutoff: float):
|
|
try:
|
|
print(f"\n=== Starting {phase} phase ===")
|
|
print(f"Device session active: {self.parent.session_active}")
|
|
print(f"Channel A mode before: {self.parent.dev.channels['A'].mode}")
|
|
|
|
# Reset channel first with longer delay
|
|
self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.parent.dev.channels['A'].constant(0)
|
|
time.sleep(1.0) # Increased settling time
|
|
|
|
# Configure for current mode with explicit setup
|
|
self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV
|
|
time.sleep(0.5) # Additional delay for mode change
|
|
|
|
# Set current with proper polarity
|
|
if phase == "charge":
|
|
print(f"Starting CHARGE at {current}A to {target_voltage}V")
|
|
self.parent.dev.channels['A'].constant(current)
|
|
else:
|
|
print(f"Starting DISCHARGE at {-current}A to {target_voltage}V")
|
|
self.parent.dev.channels['A'].constant(-current)
|
|
|
|
time.sleep(1.0) # Allow more settling time
|
|
|
|
print(f"Set constant current to: {current} A on channel A")
|
|
|
|
# Verify current setting with more samples
|
|
samples = self.parent.dev.read(50, timeout=2000) # More samples, longer timeout
|
|
measured_current = np.mean([s[0][1] for s in samples])
|
|
print(f"Requested {current}A, Measured {measured_current:.6f}A")
|
|
|
|
# Additional debug info
|
|
print(f"Channel A mode: {self.parent.dev.channels['A'].mode}")
|
|
|
|
if abs(measured_current) < 0.001: # If current is still near zero
|
|
print("Warning: Current not being applied - checking connection")
|
|
# Try resetting the device session
|
|
self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
time.sleep(0.5)
|
|
self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV
|
|
time.sleep(0.5)
|
|
self.parent.dev.channels['A'].constant(current if phase == "charge" else -current)
|
|
time.sleep(1.0)
|
|
|
|
# Re-measure
|
|
samples = self.parent.dev.read(50, timeout=2000)
|
|
measured_current = np.mean([s[0][1] for s in samples])
|
|
print(f"After reset: Requested {current}A, Measured {measured_current:.6f}A")
|
|
|
|
if abs(measured_current) < 0.001: # Still no current
|
|
raise RuntimeError(f"Failed to apply {current}A - check device connection and battery")
|
|
|
|
# Main phase loop
|
|
start_time = time.time()
|
|
last_update = start_time
|
|
|
|
while self._running:
|
|
with QMutexLocker(self._mutex):
|
|
if self.parent.request_stop:
|
|
break
|
|
|
|
if not self.parent.test_data['voltage']:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
current_voltage = self.parent.test_data['voltage'][-1]
|
|
current_time = time.time()
|
|
delta_t = current_time - last_update
|
|
last_update = current_time
|
|
|
|
# Update capacity
|
|
if phase == "charge":
|
|
self.parent.charge_capacity += abs(self.parent.test_data['current'][-1]) * delta_t / 3600
|
|
progress = (current_voltage - discharge_cutoff) / (target_voltage - discharge_cutoff)
|
|
else:
|
|
self.parent.capacity_ah += abs(self.parent.test_data['current'][-1]) * delta_t / 3600
|
|
progress = (charge_cutoff - current_voltage) / (charge_cutoff - target_voltage)
|
|
|
|
progress = max(0.0, min(1.0, progress))
|
|
self.progress_updated.emit(progress, f"{phase.capitalize()}ing")
|
|
|
|
# Check termination conditions
|
|
if ((phase == "charge" and current_voltage >= target_voltage) or
|
|
(phase == "discharge" and current_voltage <= target_voltage)):
|
|
break
|
|
|
|
time.sleep(0.1)
|
|
|
|
except AttributeError as e:
|
|
if '_constant' in str(e):
|
|
print("Warning: Internal attribute check failed, but current setting should still work")
|
|
# Continue with the test
|
|
else:
|
|
self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}")
|
|
raise
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}")
|
|
raise
|
|
finally:
|
|
# Ensure channel is reset even if error occurs
|
|
try:
|
|
self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.parent.dev.channels['A'].constant(0)
|
|
except:
|
|
pass # Best effort cleanup
|
|
|
|
class BatteryTester(QMainWindow):
|
|
"""Main application window for battery capacity testing."""
|
|
|
|
error_signal = pyqtSignal(str)
|
|
|
|
def __init__(self):
|
|
"""Initialize the battery tester application with enhanced features."""
|
|
super().__init__()
|
|
self.error_signal.connect(self.handle_device_error)
|
|
|
|
# Initialize thread-safety objects
|
|
self._mutex = QMutex() # For general thread safety
|
|
self.data_mutex = QMutex() # For test data protection
|
|
|
|
# Enhanced data structure for flexible mode support
|
|
self.test_data = {
|
|
'time': deque(),
|
|
'voltage': deque(),
|
|
'current': deque(),
|
|
'mode': deque(), # Tracks operation mode (CC, CV, etc.)
|
|
'phase': deque(), # Tracks charge/discharge/rest
|
|
'capacity': deque(),
|
|
'energy': deque()
|
|
}
|
|
|
|
# Test control variables
|
|
self.test_phase = "Ready"
|
|
self.current_mode = "None" # Tracks current operation mode
|
|
self.capacity_ah = 0.0 # Discharge capacity
|
|
self.charge_capacity = 0.0 # Charge capacity
|
|
self.energy_wh = 0.0 # Energy in watt-hours
|
|
self.coulomb_efficiency = 0.0
|
|
self.cycle_count = 0
|
|
self.cycle_data = [] # Stores cycle statistics
|
|
|
|
# Color scheme
|
|
self.bg_color = QColor(46, 52, 64)
|
|
self.fg_color = QColor(216, 222, 233)
|
|
self.accent_color = QColor(94, 129, 172)
|
|
self.warning_color = QColor(191, 97, 106)
|
|
self.success_color = QColor(163, 190, 140)
|
|
self.cv_color = QColor(143, 188, 187) # Additional color for CV mode
|
|
|
|
# Device and test status
|
|
self.session_active = False
|
|
self.measuring = False
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.cv_mode_enabled = False # CV charge mode flag
|
|
self.request_stop = False
|
|
self.interval = 0.1 # Measurement interval
|
|
|
|
# Logging configuration
|
|
self.log_dir = os.path.expanduser("~/adalm1000/logs")
|
|
os.makedirs(self.log_dir, exist_ok=True)
|
|
self.start_time = time.time()
|
|
self.log_buffer = []
|
|
self.current_cycle_file = None
|
|
|
|
# Thread management
|
|
self.measurement_thread = None
|
|
self.test_thread = None
|
|
|
|
# Initialize UI with all controls
|
|
self._setup_ui()
|
|
|
|
# Initialize device with delay to avoid USB issues
|
|
QTimer.singleShot(100, self.safe_init_device)
|
|
|
|
def _setup_ui(self):
|
|
"""Configure the user interface."""
|
|
self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)")
|
|
self.resize(1000, 800)
|
|
self.setMinimumSize(800, 700)
|
|
|
|
# Set color palette
|
|
palette = self.palette()
|
|
palette.setColor(QPalette.Window, self.bg_color)
|
|
palette.setColor(QPalette.WindowText, self.fg_color)
|
|
palette.setColor(QPalette.Base, QColor(59, 66, 82))
|
|
palette.setColor(QPalette.Text, self.fg_color)
|
|
palette.setColor(QPalette.Button, self.accent_color)
|
|
palette.setColor(QPalette.ButtonText, self.fg_color)
|
|
self.setPalette(palette)
|
|
|
|
# Main widget and layout
|
|
self.central_widget = QWidget()
|
|
self.setCentralWidget(self.central_widget)
|
|
self.main_layout = QVBoxLayout(self.central_widget)
|
|
self.main_layout.setContentsMargins(10, 10, 10, 10)
|
|
self.main_layout.setSpacing(10)
|
|
|
|
# Header section
|
|
header_frame = QWidget()
|
|
header_layout = QHBoxLayout(header_frame)
|
|
header_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)")
|
|
self.title_label.setStyleSheet(f"font-size: 16px; font-weight: bold; color: {self.accent_color.name()};")
|
|
header_layout.addWidget(self.title_label, 1)
|
|
|
|
# Connection indicator
|
|
self.connection_label = QLabel("Disconnected")
|
|
header_layout.addWidget(self.connection_label)
|
|
|
|
self.status_light = QLabel()
|
|
self.status_light.setFixedSize(20, 20)
|
|
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
|
|
header_layout.addWidget(self.status_light)
|
|
|
|
# Reconnect button
|
|
self.reconnect_btn = QPushButton("Connect")
|
|
self.reconnect_btn.clicked.connect(self.reconnect_device)
|
|
header_layout.addWidget(self.reconnect_btn)
|
|
|
|
self.main_layout.addWidget(header_frame)
|
|
|
|
# Measurement display
|
|
display_frame = QFrame()
|
|
display_frame.setFrameShape(QFrame.StyledPanel)
|
|
display_layout = QGridLayout(display_frame)
|
|
display_layout.setHorizontalSpacing(15)
|
|
display_layout.setVerticalSpacing(8)
|
|
|
|
measurement_labels = [
|
|
("Voltage", "V"),
|
|
("Current", "A"),
|
|
("Test Phase", ""),
|
|
("Elapsed Time", "s"),
|
|
("Discharge Capacity", "Ah"),
|
|
("Charge Capacity", "Ah"),
|
|
("Coulomb Efficiency", "%"),
|
|
("Cycle Count", ""),
|
|
]
|
|
|
|
self.value_labels = {} # Optional: Store labels for easy update
|
|
|
|
for i, (label, unit) in enumerate(measurement_labels):
|
|
row = i // 2
|
|
col = (i % 2) * 3 # Each block is 3 columns: label | value | unit
|
|
|
|
name_label = QLabel(label + ":")
|
|
name_label.setStyleSheet("font-size: 11px;")
|
|
display_layout.addWidget(name_label, row, col)
|
|
|
|
value_label = QLabel("0.000")
|
|
value_label.setStyleSheet("font-size: 12px; font-weight: bold; min-width: 60px;")
|
|
display_layout.addWidget(value_label, row, col + 1)
|
|
|
|
unit_label = QLabel(unit)
|
|
unit_label.setStyleSheet("font-size: 11px; color: gray;")
|
|
display_layout.addWidget(unit_label, row, col + 2)
|
|
|
|
# Save reference for updating later
|
|
self.value_labels[label] = value_label
|
|
|
|
# Assign to instance attributes for specific fields
|
|
if label == "Voltage":
|
|
self.voltage_label = value_label
|
|
elif label == "Current":
|
|
self.current_label = value_label
|
|
elif label == "Test Phase":
|
|
self.phase_label = value_label
|
|
elif label == "Elapsed Time":
|
|
self.time_label = value_label
|
|
elif label == "Discharge Capacity":
|
|
self.capacity_label = value_label
|
|
elif label == "Charge Capacity":
|
|
self.charge_capacity_label = value_label
|
|
elif label == "Coulomb Efficiency":
|
|
self.efficiency_label = value_label
|
|
elif label == "Cycle Count":
|
|
self.cycle_label = value_label
|
|
|
|
self.main_layout.addWidget(display_frame)
|
|
|
|
# Progress bar
|
|
self.progress_bar = QProgressBar()
|
|
self.progress_bar.setRange(0, 100)
|
|
self.progress_bar.setTextVisible(False)
|
|
self.progress_bar.setStyleSheet(f"""
|
|
QProgressBar {{
|
|
border: 1px solid {self.fg_color.name()};
|
|
border-radius: 5px;
|
|
text-align: center;
|
|
}}
|
|
QProgressBar::chunk {{
|
|
background-color: {self.accent_color.name()};
|
|
}}
|
|
""")
|
|
self.main_layout.addWidget(self.progress_bar)
|
|
|
|
# Control section
|
|
controls_frame = QWidget()
|
|
controls_layout = QHBoxLayout(controls_frame)
|
|
controls_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
# Parameters frame
|
|
params_frame = QFrame()
|
|
params_frame.setFrameShape(QFrame.StyledPanel)
|
|
params_layout = QGridLayout(params_frame)
|
|
|
|
# Add CV cutoff current input
|
|
params_layout.addWidget(QLabel("CV Cutoff Current (A):"), 4, 0)
|
|
self.cv_cutoff_input = QLineEdit("0.02") # 20mA default
|
|
self.cv_cutoff_input.setFixedWidth(80)
|
|
self.cv_cutoff_input.setToolTip("Current at which CV charging should stop")
|
|
params_layout.addWidget(self.cv_cutoff_input, 4, 1)
|
|
|
|
# Battery capacity
|
|
params_layout.addWidget(QLabel("Battery Capacity (Ah):"), 0, 0)
|
|
self.capacity_input = QLineEdit("0.2")
|
|
self.capacity_input.setFixedWidth(80)
|
|
self.capacity_input.setToolTip("Nominal capacity of the battery in Amp-hours")
|
|
params_layout.addWidget(self.capacity_input, 0, 1)
|
|
|
|
# Charge cutoff voltage
|
|
params_layout.addWidget(QLabel("Charge Cutoff (V):"), 1, 0)
|
|
self.charge_cutoff_input = QLineEdit("1.43")
|
|
self.charge_cutoff_input.setFixedWidth(80)
|
|
self.charge_cutoff_input.setToolTip("Voltage at which charging should stop")
|
|
params_layout.addWidget(self.charge_cutoff_input, 1, 1)
|
|
|
|
# Discharge cutoff voltage
|
|
params_layout.addWidget(QLabel("Discharge Cutoff (V):"), 2, 0)
|
|
self.discharge_cutoff_input = QLineEdit("0.9")
|
|
self.discharge_cutoff_input.setFixedWidth(80)
|
|
self.discharge_cutoff_input.setToolTip("Voltage at which discharging should stop")
|
|
params_layout.addWidget(self.discharge_cutoff_input, 2, 1)
|
|
|
|
# Rest time
|
|
params_layout.addWidget(QLabel("Rest Time (hours):"), 3, 0)
|
|
self.rest_time_input = QLineEdit("0.25")
|
|
self.rest_time_input.setFixedWidth(80)
|
|
self.rest_time_input.setToolTip("Rest period between charge/discharge cycles")
|
|
params_layout.addWidget(self.rest_time_input, 3, 1)
|
|
|
|
# C-Rate for test
|
|
params_layout.addWidget(QLabel("Test C-Rate:"), 0, 2)
|
|
self.c_rate_input = QLineEdit("0.1")
|
|
self.c_rate_input.setFixedWidth(60)
|
|
self.c_rate_input.setToolTip("Charge/discharge rate relative to battery capacity (e.g., 0.2 for C/5)")
|
|
params_layout.addWidget(self.c_rate_input, 0, 3)
|
|
params_layout.addWidget(QLabel("(e.g., 0.2 for C/5)"), 0, 4)
|
|
|
|
controls_layout.addWidget(params_frame, 1)
|
|
|
|
# Button area
|
|
button_frame = QWidget()
|
|
button_layout = QVBoxLayout(button_frame)
|
|
button_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
# Add CV mode checkbox and cutoff current input
|
|
self.cv_mode_check = QCheckBox("Enable CV Charge")
|
|
self.cv_mode_check.setChecked(False)
|
|
self.cv_mode_check.setToolTip("Enable constant voltage charge phase after CC charge")
|
|
button_layout.addWidget(self.cv_mode_check)
|
|
|
|
self.start_button = QPushButton("START TEST")
|
|
self.start_button.clicked.connect(self.start_test)
|
|
self.start_button.setStyleSheet(f"""
|
|
QPushButton {{
|
|
background-color: {self.accent_color.name()};
|
|
font-weight: bold;
|
|
padding: 8px;
|
|
border-radius: 5px;
|
|
}}
|
|
QPushButton:disabled {{
|
|
background-color: #4C566A;
|
|
}}
|
|
""")
|
|
self.start_button.setEnabled(False)
|
|
button_layout.addWidget(self.start_button)
|
|
|
|
self.stop_button = QPushButton("STOP TEST")
|
|
self.stop_button.clicked.connect(self.stop_test)
|
|
self.stop_button.setStyleSheet(f"""
|
|
QPushButton {{
|
|
background-color: {self.warning_color.name()};
|
|
font-weight: bold;
|
|
padding: 8px;
|
|
border-radius: 5px;
|
|
}}
|
|
QPushButton:disabled {{
|
|
background-color: #4C566A;
|
|
}}
|
|
""")
|
|
self.stop_button.setEnabled(False)
|
|
button_layout.addWidget(self.stop_button)
|
|
|
|
# Continuous mode checkbox
|
|
self.continuous_check = QCheckBox("Continuous Mode")
|
|
self.continuous_check.setChecked(True)
|
|
self.continuous_check.setToolTip("Run multiple charge/discharge cycles until stopped")
|
|
button_layout.addWidget(self.continuous_check)
|
|
button_layout.addWidget(self.cv_mode_check)
|
|
|
|
controls_layout.addWidget(button_frame)
|
|
self.main_layout.addWidget(controls_frame)
|
|
|
|
# Plot area
|
|
self._setup_plot()
|
|
self.main_layout.addWidget(self.plot_widget, 1)
|
|
|
|
# Status bar
|
|
self.status_bar = QLabel("Ready")
|
|
self.status_bar.setStyleSheet("font-size: 10px; padding: 5px;")
|
|
self.main_layout.addWidget(self.status_bar)
|
|
|
|
def _setup_plot(self):
|
|
"""Configure the matplotlib plot."""
|
|
self.plot_widget = QWidget()
|
|
plot_layout = QVBoxLayout(self.plot_widget)
|
|
|
|
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440')
|
|
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
|
|
self.ax = self.fig.add_subplot(111)
|
|
self.ax.set_facecolor('#3B4252')
|
|
|
|
# Initial voltage range
|
|
voltage_padding = 0.2
|
|
min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding)
|
|
max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding
|
|
self.ax.set_ylim(min_voltage, max_voltage)
|
|
|
|
# Voltage plot
|
|
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
|
|
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
|
|
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
|
|
|
|
# Current plot (right axis)
|
|
self.ax2 = self.ax.twinx()
|
|
current_padding = 0.05
|
|
test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text())
|
|
max_current = test_current * 1.5
|
|
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
|
|
|
|
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
|
|
self.ax2.set_ylabel("Current (A)", color='r')
|
|
self.ax2.tick_params(axis='y', labelcolor='r')
|
|
|
|
self.ax.set_xlabel('Time (s)', color=self.fg_color.name())
|
|
self.ax.set_title('Battery Test (CC)', color=self.fg_color.name())
|
|
self.ax.tick_params(axis='x', colors=self.fg_color.name())
|
|
self.ax.grid(True, color='#4C566A')
|
|
|
|
# Position legends
|
|
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
|
|
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
|
|
|
|
# Embed plot
|
|
self.canvas = FigureCanvas(self.fig)
|
|
plot_layout.addWidget(self.canvas)
|
|
|
|
def safe_init_device(self):
|
|
"""Safe device initialization with error handling."""
|
|
try:
|
|
self.init_device()
|
|
except Exception as e:
|
|
self.handle_device_error(str(e))
|
|
|
|
def init_device(self):
|
|
"""Initialize the ADALM1000 device."""
|
|
# Temporarily enable USB debugging
|
|
os.environ['LIBUSB_DEBUG'] = '3' # Set to 0 in production
|
|
self.cleanup_device()
|
|
|
|
try:
|
|
print("Waiting before initializing session...")
|
|
time.sleep(1.5) # Delay helps avoid "device busy" issues
|
|
|
|
self.session = pysmu.Session()
|
|
|
|
# 🔍 Log detected devices
|
|
print(f"Devices found: {self.session.devices}")
|
|
|
|
if not self.session.devices:
|
|
raise Exception("No ADALM1000 detected - check USB connection")
|
|
|
|
self.dev = self.session.devices[0]
|
|
|
|
# Reset channels
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
self.dev.channels['B'].constant(0)
|
|
|
|
self.session.start(0)
|
|
|
|
# Update UI
|
|
self.status_light.setStyleSheet("background-color: green; border-radius: 10px;")
|
|
self.connection_label.setText("Connected")
|
|
self.status_bar.setText("Device connected | Ready for measurement")
|
|
self.session_active = True
|
|
self.start_button.setEnabled(True)
|
|
|
|
# Start measurement thread
|
|
self.start_measurement_thread()
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Device initialization failed: {str(e)}")
|
|
|
|
def cleanup_device(self):
|
|
"""Clean up device resources efficiently with essential safeguards."""
|
|
print("Cleaning up device session...")
|
|
|
|
# Stop threads with timeout
|
|
for thread in [self.measurement_thread, getattr(self, 'test_thread', None)]:
|
|
if thread and thread.isRunning():
|
|
try:
|
|
thread.stop()
|
|
if not thread.wait(800): # Reduced timeout
|
|
thread.terminate()
|
|
print(f"Warning: {thread.__class__.__name__} required termination")
|
|
except Exception as e:
|
|
print(f"Error stopping {thread.__class__.__name__}: {str(e)}")
|
|
|
|
# Clean up session if it exists
|
|
if getattr(self, 'session', None):
|
|
try:
|
|
if self.session_active:
|
|
# Quick channel reset if device exists
|
|
if hasattr(self, 'dev'):
|
|
try:
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
except:
|
|
pass # Best effort cleanup
|
|
|
|
self.session.end()
|
|
del self.session
|
|
except Exception as e:
|
|
print(f"Session cleanup error: {str(e)}")
|
|
finally:
|
|
self.session_active = False
|
|
|
|
# Reset all states and UI
|
|
self.measuring = False
|
|
self.test_running = False
|
|
self.request_stop = True
|
|
|
|
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
|
|
self.connection_label.setText("Disconnected")
|
|
self.start_button.setEnabled(False)
|
|
self.stop_button.setEnabled(False)
|
|
|
|
def start_measurement_thread(self):
|
|
"""Start the measurement thread."""
|
|
if not hasattr(self, 'dev') or self.dev is None:
|
|
print("Device not initialized, cannot start measurement thread")
|
|
return
|
|
|
|
if self.measurement_thread is not None:
|
|
self.measurement_thread.stop()
|
|
self.measurement_thread.wait(500)
|
|
|
|
self.measurement_thread = MeasurementThread(
|
|
device=self.dev,
|
|
interval=self.interval,
|
|
start_time=self.start_time
|
|
)
|
|
|
|
self.measurement_thread.update_signal.connect(self.update_measurements)
|
|
self.measurement_thread.error_signal.connect(self.handle_device_error)
|
|
self.measurement_thread.start()
|
|
|
|
def reconnect_device(self):
|
|
"""Attempt to reconnect the device."""
|
|
self.status_bar.setText("Attempting to reconnect...")
|
|
self.cleanup_device()
|
|
QTimer.singleShot(2000, self.safe_init_device) # Retry with delay
|
|
|
|
def handle_device_error(self, error_msg):
|
|
"""Thread-safe device error handling."""
|
|
try:
|
|
# Ensure this runs in the main thread
|
|
if QThread.currentThread() != self.thread():
|
|
self.error_signal.emit(str(error_msg))
|
|
return
|
|
|
|
print(f"Device error: {error_msg}")
|
|
|
|
def update_ui():
|
|
self.status_bar.setText(f"Device error: {error_msg}")
|
|
if self.isVisible():
|
|
QMessageBox.critical(
|
|
self,
|
|
"Device Error",
|
|
f"Device error occurred:\n{error_msg}\n\n"
|
|
"1. Check USB connection\n"
|
|
"2. Try manual reconnect\n"
|
|
"3. Restart application if problems persist"
|
|
)
|
|
|
|
QTimer.singleShot(0, lambda: (
|
|
self.cleanup_device(),
|
|
update_ui()
|
|
))
|
|
|
|
except Exception as e:
|
|
print(f"Error in error handler: {str(e)}")
|
|
try:
|
|
self.cleanup_device()
|
|
except:
|
|
pass
|
|
|
|
def start_test(self):
|
|
"""Start the complete battery test cycle with proper file initialization."""
|
|
# Check if test is already running
|
|
if self.test_running:
|
|
return
|
|
|
|
# Verify thread safety objects exist
|
|
if not hasattr(self, 'data_mutex') or self.data_mutex is None:
|
|
QMessageBox.critical(self, "Error", "Thread safety not initialized")
|
|
return
|
|
|
|
try:
|
|
with QMutexLocker(self.data_mutex):
|
|
# Get and validate input values with error handling
|
|
try:
|
|
capacity = float(self.capacity_input.text())
|
|
charge_cutoff = float(self.charge_cutoff_input.text())
|
|
discharge_cutoff = float(self.discharge_cutoff_input.text())
|
|
c_rate = float(self.c_rate_input.text())
|
|
cv_cutoff = float(self.cv_cutoff_input.text())
|
|
|
|
if capacity <= 0:
|
|
raise ValueError("Battery capacity must be positive")
|
|
if charge_cutoff <= discharge_cutoff:
|
|
raise ValueError("Charge cutoff must be higher than discharge cutoff")
|
|
if c_rate <= 0:
|
|
raise ValueError("C-rate must be positive")
|
|
|
|
test_current = c_rate * capacity
|
|
if test_current > 0.2:
|
|
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
|
|
|
|
except ValueError as e:
|
|
QMessageBox.critical(self, "Input Error", str(e))
|
|
return
|
|
|
|
# Reset test state and data structures
|
|
try:
|
|
for key in self.test_data:
|
|
self.test_data[key].clear()
|
|
self.capacity_ah = 0.0
|
|
self.charge_capacity = 0.0
|
|
self.coulomb_efficiency = 0.0
|
|
self.cycle_count = 0
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Data Error", f"Couldn't reset test data: {str(e)}")
|
|
return
|
|
|
|
# Initialize timing
|
|
self.start_time = time.time()
|
|
self.time_label.setText("00:00:00")
|
|
self.reset_plot()
|
|
|
|
# Initialize log file with error handling
|
|
try:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
self.filename = os.path.join(self.log_dir, f"battery_test_{timestamp}.csv")
|
|
|
|
with open(self.filename, 'w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow([
|
|
'Time(s)', 'Voltage(V)', 'Current(A)', 'Mode',
|
|
'Phase', 'Discharge(Ah)', 'Charge(Ah)', 'Efficiency(%)',
|
|
'Cycle'
|
|
])
|
|
|
|
self.current_cycle_file = open(self.filename, 'a', newline='')
|
|
self.log_writer = csv.writer(self.current_cycle_file)
|
|
self.log_buffer = []
|
|
|
|
except Exception as e:
|
|
QMessageBox.critical(
|
|
self,
|
|
"File Error",
|
|
f"Failed to initialize log file:\n{str(e)}\n\n"
|
|
f"Check directory permissions:\n{self.log_dir}"
|
|
)
|
|
return
|
|
|
|
# Start test with thread safety
|
|
try:
|
|
self.test_running = True
|
|
self.request_stop = False
|
|
self.test_phase = "Initializing"
|
|
self.current_mode = "CC"
|
|
self.continuous_mode = self.continuous_check.isChecked()
|
|
self.cv_mode_enabled = self.cv_mode_check.isChecked()
|
|
|
|
# UI updates
|
|
self.start_button.setEnabled(False)
|
|
self.stop_button.setEnabled(True)
|
|
self.status_bar.setText(
|
|
f"Test started | Target: {discharge_cutoff}V @ {test_current:.3f}A | "
|
|
f"CV Mode: {'ON' if self.cv_mode_enabled else 'OFF'}"
|
|
)
|
|
|
|
# Start test thread with cleanup guard
|
|
if self.test_thread and self.test_thread.isRunning():
|
|
self.test_thread.stop()
|
|
|
|
self.test_thread = TestSequenceThread(self)
|
|
self.test_thread.progress_updated.connect(self.update_test_progress)
|
|
self.test_thread.cycle_completed.connect(self.update_cycle_stats)
|
|
self.test_thread.error_occurred.connect(self.handle_device_error)
|
|
self.test_thread.finished.connect(self.finalize_test)
|
|
self.test_thread.start()
|
|
|
|
except Exception as e:
|
|
self.finalize_test(show_message=False)
|
|
QMessageBox.critical(
|
|
self,
|
|
"Test Error",
|
|
f"Failed to start test thread:\n{str(e)}"
|
|
)
|
|
|
|
except Exception as e:
|
|
self.finalize_test(show_message=False)
|
|
QMessageBox.critical(
|
|
self,
|
|
"Critical Error",
|
|
f"Unexpected error in test initialization:\n{str(e)}"
|
|
)
|
|
|
|
def update_test_progress(self, progress: float, phase: str):
|
|
"""Update test progress and phase display."""
|
|
self.test_phase = phase
|
|
self.phase_label.setText(phase)
|
|
self.progress_bar.setValue(int(progress * 100))
|
|
|
|
def update_cycle_stats(self, cycle: int, discharge: float, charge: float, efficiency: float):
|
|
"""Update cycle statistics."""
|
|
self.cycle_count = cycle
|
|
self.capacity_ah = discharge
|
|
self.charge_capacity = charge
|
|
self.coulomb_efficiency = efficiency
|
|
|
|
self.cycle_label.setText(f"{cycle}")
|
|
self.capacity_label.setText(f"{discharge:.4f}")
|
|
self.charge_capacity_label.setText(f"{charge:.4f}")
|
|
self.efficiency_label.setText(f"{efficiency:.1f}")
|
|
|
|
self.status_bar.setText(
|
|
f"Cycle {cycle} completed | "
|
|
f"Discharge: {discharge:.3f}Ah | "
|
|
f"Charge: {charge:.3f}Ah | "
|
|
f"Efficiency: {efficiency:.1f}%"
|
|
)
|
|
|
|
# Write cycle summary
|
|
self.write_cycle_summary()
|
|
|
|
def stop_test(self):
|
|
"""Safely stop the running test."""
|
|
if not self.test_running:
|
|
return
|
|
|
|
self.request_stop = True
|
|
self.test_running = False
|
|
self.measuring = False
|
|
self.test_phase = "Ready"
|
|
self.phase_label.setText(self.test_phase)
|
|
|
|
if hasattr(self, 'dev'):
|
|
try:
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
except Exception as e:
|
|
print(f"Error resetting device: {e}")
|
|
|
|
# Update UI
|
|
self.status_bar.setText("Test stopped - Ready for new test")
|
|
self.stop_button.setEnabled(False)
|
|
self.start_button.setEnabled(True)
|
|
|
|
self.finalize_test(show_message=False)
|
|
|
|
def finalize_test(self, show_message: bool = True):
|
|
"""Final cleanup after test completion with robust error handling."""
|
|
# Set up error tracking
|
|
errors = []
|
|
|
|
try:
|
|
# 1. Handle log buffer writing
|
|
if getattr(self, 'log_writer', None) is not None:
|
|
try:
|
|
if getattr(self, 'log_buffer', None):
|
|
try:
|
|
self.log_writer.writerows(self.log_buffer)
|
|
self.log_buffer.clear()
|
|
except Exception as e:
|
|
errors.append(f"Failed to write log buffer: {str(e)}")
|
|
except Exception as e:
|
|
errors.append(f"Log buffer access error: {str(e)}")
|
|
|
|
# 2. Handle file closure
|
|
if getattr(self, 'current_cycle_file', None) is not None:
|
|
try:
|
|
file = self.current_cycle_file
|
|
if not file.closed:
|
|
try:
|
|
file.flush()
|
|
os.fsync(file.fileno())
|
|
except Exception as e:
|
|
errors.append(f"Failed to sync file: {str(e)}")
|
|
finally:
|
|
file.close()
|
|
except Exception as e:
|
|
errors.append(f"File closure error: {str(e)}")
|
|
finally:
|
|
if hasattr(self, 'current_cycle_file'):
|
|
del self.current_cycle_file
|
|
if hasattr(self, 'log_writer'):
|
|
del self.log_writer
|
|
|
|
# 3. Clean up thread references
|
|
try:
|
|
if hasattr(self, 'test_thread') and self.test_thread is not None:
|
|
if self.test_thread.isRunning():
|
|
self.test_thread.stop()
|
|
if not self.test_thread.wait(500): # 500ms timeout
|
|
errors.append("Test thread didn't stop cleanly")
|
|
self.test_thread = None
|
|
except Exception as e:
|
|
errors.append(f"Thread cleanup error: {str(e)}")
|
|
|
|
# 4. Reset test state
|
|
try:
|
|
self.test_running = False
|
|
self.request_stop = True
|
|
self.measuring = False
|
|
self.stop_button.setEnabled(False)
|
|
self.start_button.setEnabled(True)
|
|
except Exception as e:
|
|
errors.append(f"State reset error: {str(e)}")
|
|
|
|
# 5. Show completion message if requested
|
|
if show_message and self.isVisible():
|
|
try:
|
|
msg = "Test completed"
|
|
if errors:
|
|
msg += f"\n\nMinor issues occurred:\n- " + "\n- ".join(errors)
|
|
|
|
QMessageBox.information(
|
|
self,
|
|
"Test Complete",
|
|
f"{msg}\nCycles: {self.cycle_count}",
|
|
QMessageBox.Ok
|
|
)
|
|
except Exception as e:
|
|
print(f"Failed to show completion message: {str(e)}")
|
|
|
|
except Exception as e:
|
|
# Catastrophic error handling
|
|
print(f"Critical error in finalize_test: {str(e)}")
|
|
try:
|
|
if hasattr(self, 'current_cycle_file'):
|
|
try:
|
|
self.current_cycle_file.close()
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
# Ensure basic state is reset
|
|
self.test_running = False
|
|
self.request_stop = True
|
|
self.measuring = False
|
|
|
|
finally:
|
|
# Final cleanup guarantees
|
|
try:
|
|
if hasattr(self, 'log_buffer'):
|
|
self.log_buffer.clear()
|
|
except:
|
|
pass
|
|
|
|
def update_measurements(self, voltage: float, current: float, current_time: float):
|
|
"""Update measurements with enhanced data structure."""
|
|
if not hasattr(self, '_mutex'):
|
|
print("Critical Error: Mutex not initialized")
|
|
return
|
|
|
|
try:
|
|
with QMutexLocker(self._mutex):
|
|
# Limit data points
|
|
max_points = 10000
|
|
for key in self.test_data:
|
|
if len(self.test_data[key]) > max_points:
|
|
self.test_data[key].popleft()
|
|
|
|
# Store data
|
|
self.test_data['time'].append(current_time)
|
|
self.test_data['voltage'].append(voltage)
|
|
self.test_data['current'].append(current)
|
|
self.test_data['phase'].append(self.test_phase)
|
|
self.test_data['mode'].append(self.current_mode)
|
|
|
|
# Update UI
|
|
self.voltage_label.setText(f"{voltage:.4f}")
|
|
self.current_label.setText(f"{current:.4f}")
|
|
self.time_label.setText(self.format_time(current_time))
|
|
|
|
# Update plot periodically
|
|
if len(self.test_data['time']) % 10 == 0:
|
|
self.update_plot()
|
|
|
|
# Log data if test is running
|
|
if self.test_running and hasattr(self, 'log_writer'):
|
|
self.log_buffer.append([
|
|
f"{current_time:.3f}",
|
|
f"{voltage:.6f}",
|
|
f"{current:.6f}",
|
|
self.test_phase,
|
|
f"{self.capacity_ah:.4f}",
|
|
f"{self.charge_capacity:.4f}",
|
|
f"{self.coulomb_efficiency:.1f}",
|
|
f"{self.cycle_count}"
|
|
])
|
|
|
|
# Write data in blocks
|
|
if len(self.log_buffer) >= 10:
|
|
try:
|
|
self.log_writer.writerows(self.log_buffer)
|
|
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
|
|
self.current_cycle_file.flush()
|
|
self.log_buffer.clear()
|
|
except Exception as e:
|
|
print(f"Log write error: {e}")
|
|
except Exception as e:
|
|
print(f"Error in update_measurements: {str(e)}")
|
|
|
|
def write_cycle_summary(self):
|
|
"""Write cycle summary to the current cycle's log file"""
|
|
if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file:
|
|
return
|
|
|
|
summary_line = (
|
|
f"Cycle {self.cycle_count.get()} Summary - "
|
|
f"Discharge={self.capacity_ah.get():.4f}Ah, "
|
|
f"Charge={self.charge_capacity.get():.4f}Ah, "
|
|
f"Efficiency={self.coulomb_efficiency.get():.1f}%"
|
|
)
|
|
|
|
# Ensure file is open and write summary
|
|
try:
|
|
if self.log_buffer:
|
|
self.log_writer.writerows(self.log_buffer)
|
|
self.log_buffer.clear()
|
|
self.current_cycle_file.write(summary_line + "\n")
|
|
self.current_cycle_file.flush()
|
|
except Exception as e:
|
|
print(f"Error writing cycle summary: {e}")
|
|
|
|
def update_plot(self):
|
|
"""Update the plot with new data using test_data structure."""
|
|
if not self.test_data['time']:
|
|
return
|
|
|
|
self.line_voltage.set_data(
|
|
list(self.test_data['time']),
|
|
list(self.test_data['voltage'])
|
|
)
|
|
self.line_current.set_data(
|
|
list(self.test_data['time']),
|
|
list(self.test_data['current'])
|
|
)
|
|
self.auto_scale_axes()
|
|
self.canvas.draw_idle()
|
|
|
|
def auto_scale_axes(self):
|
|
"""Automatically adjust plot axes using test_data."""
|
|
if not self.test_data['time']:
|
|
return
|
|
|
|
# X-axis adjustment
|
|
max_time = list(self.test_data['time'])[-1]
|
|
current_xlim = self.ax.get_xlim()
|
|
if max_time > current_xlim[1] * 0.95:
|
|
new_xmax = max_time * 1.10 # 10% padding
|
|
self.ax.set_xlim(0, new_xmax)
|
|
self.ax2.set_xlim(0, new_xmax)
|
|
|
|
# Y-axes adjustment
|
|
if self.test_data['voltage']:
|
|
voltage_padding = 0.2
|
|
min_v = max(0, min(list(self.test_data['voltage'])) - voltage_padding)
|
|
max_v = min(5.0, max(list(self.test_data['voltage'])) + voltage_padding)
|
|
self.ax.set_ylim(min_v, max_v)
|
|
|
|
if self.test_data['current']:
|
|
current_padding = 0.05
|
|
min_c = max(-0.25, min(list(self.test_data['current'])) - current_padding)
|
|
max_c = min(0.25, max(list(self.test_data['current'])) + current_padding)
|
|
self.ax2.set_ylim(min_c, max_c)
|
|
|
|
@staticmethod
|
|
def format_time(seconds: float) -> str:
|
|
"""Format seconds as HH:MM:SS.
|
|
|
|
Args:
|
|
seconds: Time in seconds
|
|
|
|
Returns:
|
|
Formatted time string
|
|
"""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
seconds = int(seconds % 60)
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
|
|
|
def reset_plot(self):
|
|
"""Reset the plot to initial state."""
|
|
self.line_voltage.set_data([], [])
|
|
self.line_current.set_data([], [])
|
|
|
|
# Reset axes to starting values
|
|
self.ax.set_xlim(0, 10)
|
|
voltage_padding = 0.2
|
|
min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding)
|
|
max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding
|
|
self.ax.set_ylim(min_voltage, max_voltage)
|
|
|
|
self.canvas.draw()
|
|
|
|
def closeEvent(self, event):
|
|
"""Clean up when closing the window."""
|
|
self.cleanup_device()
|
|
event.accept()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = QApplication([])
|
|
app.setStyle('Fusion')
|
|
window = BatteryTester()
|
|
window.show()
|
|
app.exec_() |