Initialization Order:
Moved all attribute initializations to the start of __init__
Specifically initialized self.test_phase = "Idle" before it's used
UI Setup:
Now safely sets the phase label text after self.test_phase is initialized
Ensures all attributes exist before they're accessed
Error Prevention:
The QStandardPaths warning is still suppressed
All UI elements are properly initialized before use
(D)
1037 lines
40 KiB
Python
1037 lines
40 KiB
Python
# -*- coding: utf-8 -*-
|
|
import os
|
|
import time
|
|
import csv
|
|
import threading
|
|
from datetime import datetime
|
|
import numpy as np
|
|
from collections import deque
|
|
|
|
# Suppress QStandardPaths warning
|
|
os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false'
|
|
|
|
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
|
|
QGridLayout, QLabel, QPushButton, QLineEdit, QFrame,
|
|
QCheckBox, QMessageBox, QFileDialog)
|
|
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread
|
|
from PyQt5.QtGui import QColor, QPalette
|
|
|
|
import pysmu
|
|
import matplotlib
|
|
matplotlib.use('Qt5Agg')
|
|
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
|
|
from matplotlib.figure import Figure
|
|
|
|
|
|
class DeviceDisconnectedError(Exception):
|
|
pass
|
|
|
|
|
|
class MeasurementThread(QObject):
|
|
update_signal = pyqtSignal(float, float, float)
|
|
error_signal = pyqtSignal(str)
|
|
|
|
def __init__(self, device, interval=0.1):
|
|
super().__init__()
|
|
self.device = device
|
|
self.interval = interval
|
|
self.running = False
|
|
self.filter_window_size = 10
|
|
self.start_time = time.time()
|
|
|
|
def run(self):
|
|
self.running = True
|
|
voltage_window = []
|
|
current_window = []
|
|
|
|
while self.running:
|
|
try:
|
|
samples = self.device.read(self.filter_window_size, 500, True)
|
|
if not samples:
|
|
raise DeviceDisconnectedError("No samples received")
|
|
|
|
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
|
|
raw_current = np.mean([s[0][1] for s in samples]) # Channel A current
|
|
current_time = time.time() - self.start_time
|
|
|
|
# Apply moving average filter
|
|
voltage_window.append(raw_voltage)
|
|
current_window.append(raw_current)
|
|
|
|
if len(voltage_window) > self.filter_window_size:
|
|
voltage_window.pop(0)
|
|
current_window.pop(0)
|
|
|
|
voltage = np.mean(voltage_window)
|
|
current = np.mean(current_window)
|
|
|
|
self.update_signal.emit(voltage, current, current_time)
|
|
time.sleep(max(0.05, self.interval))
|
|
|
|
except Exception as e:
|
|
self.error_signal.emit(str(e))
|
|
break
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
|
|
|
|
class BatteryTester(QMainWindow):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
# Initialize all attributes first
|
|
self.time_data = deque()
|
|
self.voltage_data = deque()
|
|
self.current_data = deque()
|
|
self.phase_data = deque()
|
|
|
|
# Test state variables
|
|
self.test_phase = "Idle"
|
|
self.capacity_ah = 0.0
|
|
self.charge_capacity = 0.0
|
|
self.coulomb_efficiency = 0.0
|
|
self.cycle_count = 0
|
|
|
|
# Color scheme
|
|
self.bg_color = QColor(46, 52, 64)
|
|
self.fg_color = QColor(216, 222, 233)
|
|
self.accent_color = QColor(94, 129, 172)
|
|
self.warning_color = QColor(191, 97, 106)
|
|
self.success_color = QColor(163, 190, 140)
|
|
|
|
# Device and measurement state
|
|
self.session_active = False
|
|
self.measuring = False
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.request_stop = False
|
|
self.interval = 0.1
|
|
self.log_dir = os.path.expanduser("~/adalm1000/logs")
|
|
os.makedirs(self.log_dir, exist_ok=True)
|
|
|
|
# Initialize UI
|
|
self.setup_ui()
|
|
|
|
# Track measurement thread
|
|
self.measurement_thread = None
|
|
self.measurement_worker = None
|
|
|
|
# Track test thread
|
|
self.test_thread = None
|
|
|
|
# Initialize device after UI is set up
|
|
QTimer.singleShot(100, self.init_device)
|
|
|
|
def setup_ui(self):
|
|
"""Configure the user interface"""
|
|
self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)")
|
|
self.resize(1000, 800)
|
|
self.setMinimumSize(800, 700)
|
|
|
|
# Set background color
|
|
palette = self.palette()
|
|
palette.setColor(QPalette.Window, self.bg_color)
|
|
palette.setColor(QPalette.WindowText, self.fg_color)
|
|
palette.setColor(QPalette.Base, QColor(59, 66, 82))
|
|
palette.setColor(QPalette.Text, self.fg_color)
|
|
palette.setColor(QPalette.Button, self.accent_color)
|
|
palette.setColor(QPalette.ButtonText, self.fg_color)
|
|
self.setPalette(palette)
|
|
|
|
# Main widget and layout
|
|
self.central_widget = QWidget()
|
|
self.setCentralWidget(self.central_widget)
|
|
self.main_layout = QVBoxLayout(self.central_widget)
|
|
self.main_layout.setContentsMargins(10, 10, 10, 10)
|
|
self.main_layout.setSpacing(10)
|
|
|
|
# Header area
|
|
header_frame = QWidget()
|
|
header_layout = QHBoxLayout(header_frame)
|
|
header_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)")
|
|
self.title_label.setStyleSheet(f"font-size: 16px; font-weight: bold; color: {self.accent_color.name()};")
|
|
header_layout.addWidget(self.title_label, 1)
|
|
|
|
# Status indicator
|
|
self.connection_label = QLabel("Disconnected")
|
|
header_layout.addWidget(self.connection_label)
|
|
|
|
self.status_light = QLabel()
|
|
self.status_light.setFixedSize(20, 20)
|
|
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
|
|
header_layout.addWidget(self.status_light)
|
|
|
|
# Reconnect button
|
|
self.reconnect_btn = QPushButton("Reconnect")
|
|
self.reconnect_btn.clicked.connect(self.reconnect_device)
|
|
header_layout.addWidget(self.reconnect_btn)
|
|
|
|
self.main_layout.addWidget(header_frame)
|
|
|
|
# Measurement display
|
|
display_frame = QFrame()
|
|
display_frame.setFrameShape(QFrame.StyledPanel)
|
|
display_layout = QGridLayout(display_frame)
|
|
|
|
measurement_labels = [
|
|
("Voltage (V)", "V"),
|
|
("Current (A)", "A"),
|
|
("Test Phase", ""),
|
|
("Elapsed Time", "s"),
|
|
("Discharge Capacity", "Ah"),
|
|
("Charge Capacity", "Ah"),
|
|
("Coulomb Eff.", "%"),
|
|
("Cycle Count", ""),
|
|
]
|
|
|
|
for i, (label, unit) in enumerate(measurement_labels):
|
|
row = i // 2
|
|
col = (i % 2) * 2
|
|
|
|
lbl = QLabel(f"{label}:")
|
|
lbl.setStyleSheet("font-size: 11px;")
|
|
display_layout.addWidget(lbl, row, col)
|
|
|
|
value_label = QLabel("0.000")
|
|
value_label.setStyleSheet("font-size: 12px; font-weight: bold;")
|
|
display_layout.addWidget(value_label, row, col + 1)
|
|
|
|
if unit:
|
|
unit_label = QLabel(unit)
|
|
display_layout.addWidget(unit_label, row, col + 2)
|
|
|
|
if i == 0:
|
|
self.voltage_label = value_label
|
|
elif i == 1:
|
|
self.current_label = value_label
|
|
elif i == 2:
|
|
self.phase_label = value_label
|
|
self.phase_label.setText(self.test_phase) # Now safe to access test_phase
|
|
elif i == 3:
|
|
self.time_label = value_label
|
|
elif i == 4:
|
|
self.capacity_label = value_label
|
|
elif i == 5:
|
|
self.charge_capacity_label = value_label
|
|
elif i == 6:
|
|
self.efficiency_label = value_label
|
|
elif i == 7:
|
|
self.cycle_label = value_label
|
|
|
|
self.main_layout.addWidget(display_frame)
|
|
|
|
# Control area
|
|
controls_frame = QWidget()
|
|
controls_layout = QHBoxLayout(controls_frame)
|
|
controls_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
# Parameters frame
|
|
params_frame = QFrame()
|
|
params_frame.setFrameShape(QFrame.StyledPanel)
|
|
params_layout = QGridLayout(params_frame)
|
|
|
|
# Battery capacity
|
|
params_layout.addWidget(QLabel("Battery Capacity (Ah):"), 0, 0)
|
|
self.capacity_input = QLineEdit("0.2")
|
|
self.capacity_input.setFixedWidth(80)
|
|
params_layout.addWidget(self.capacity_input, 0, 1)
|
|
|
|
# Charge cutoff
|
|
params_layout.addWidget(QLabel("Charge Cutoff (V):"), 1, 0)
|
|
self.charge_cutoff_input = QLineEdit("1.43")
|
|
self.charge_cutoff_input.setFixedWidth(80)
|
|
params_layout.addWidget(self.charge_cutoff_input, 1, 1)
|
|
|
|
# Discharge cutoff
|
|
params_layout.addWidget(QLabel("Discharge Cutoff (V):"), 2, 0)
|
|
self.discharge_cutoff_input = QLineEdit("0.9")
|
|
self.discharge_cutoff_input.setFixedWidth(80)
|
|
params_layout.addWidget(self.discharge_cutoff_input, 2, 1)
|
|
|
|
# Rest time
|
|
params_layout.addWidget(QLabel("Rest Time (hours):"), 3, 0)
|
|
self.rest_time_input = QLineEdit("0.25")
|
|
self.rest_time_input.setFixedWidth(80)
|
|
params_layout.addWidget(self.rest_time_input, 3, 1)
|
|
|
|
# C-rate for test
|
|
params_layout.addWidget(QLabel("Test C-rate:"), 0, 2)
|
|
self.c_rate_input = QLineEdit("0.1")
|
|
self.c_rate_input.setFixedWidth(60)
|
|
params_layout.addWidget(self.c_rate_input, 0, 3)
|
|
params_layout.addWidget(QLabel("(e.g., 0.2 for C/5)"), 0, 4)
|
|
|
|
controls_layout.addWidget(params_frame, 1)
|
|
|
|
# Button frame
|
|
button_frame = QWidget()
|
|
button_layout = QVBoxLayout(button_frame)
|
|
button_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
self.start_button = QPushButton("START TEST")
|
|
self.start_button.clicked.connect(self.start_test)
|
|
self.start_button.setStyleSheet(f"background-color: {self.accent_color.name()}; font-weight: bold;")
|
|
self.start_button.setEnabled(False) # Disabled until device is connected
|
|
button_layout.addWidget(self.start_button)
|
|
|
|
self.stop_button = QPushButton("STOP TEST")
|
|
self.stop_button.clicked.connect(self.stop_test)
|
|
self.stop_button.setStyleSheet(f"background-color: {self.warning_color.name()}; font-weight: bold;")
|
|
self.stop_button.setEnabled(False)
|
|
button_layout.addWidget(self.stop_button)
|
|
|
|
# Continuous mode checkbox
|
|
self.continuous_check = QCheckBox("Continuous Mode")
|
|
self.continuous_check.setChecked(True)
|
|
button_layout.addWidget(self.continuous_check)
|
|
|
|
controls_layout.addWidget(button_frame)
|
|
self.main_layout.addWidget(controls_frame)
|
|
|
|
# Plot area
|
|
self.setup_plot()
|
|
self.main_layout.addWidget(self.plot_widget, 1)
|
|
|
|
# Status bar
|
|
self.status_bar = QLabel("Ready")
|
|
self.status_bar.setStyleSheet("font-size: 10px; padding: 5px;")
|
|
self.main_layout.addWidget(self.status_bar)
|
|
|
|
# Initialize test phase display
|
|
self.phase_label.setText(self.test_phase)
|
|
|
|
def setup_plot(self):
|
|
"""Configure the matplotlib plot"""
|
|
self.plot_widget = QWidget()
|
|
plot_layout = QVBoxLayout(self.plot_widget)
|
|
|
|
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440')
|
|
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
|
|
self.ax = self.fig.add_subplot(111)
|
|
self.ax.set_facecolor('#3B4252')
|
|
|
|
# Set initial voltage range
|
|
voltage_padding = 0.2
|
|
min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding)
|
|
max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding
|
|
self.ax.set_ylim(min_voltage, max_voltage)
|
|
|
|
# Voltage plot
|
|
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
|
|
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
|
|
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
|
|
|
|
# Current plot (right axis)
|
|
self.ax2 = self.ax.twinx()
|
|
current_padding = 0.05
|
|
test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text())
|
|
max_current = test_current * 1.5
|
|
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
|
|
|
|
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
|
|
self.ax2.set_ylabel("Current (A)", color='r')
|
|
self.ax2.tick_params(axis='y', labelcolor='r')
|
|
|
|
self.ax.set_xlabel('Time (s)', color=self.fg_color.name())
|
|
self.ax.set_title('Battery Test (CC)', color=self.fg_color.name())
|
|
self.ax.tick_params(axis='x', colors=self.fg_color.name())
|
|
self.ax.grid(True, color='#4C566A')
|
|
|
|
# Position legends
|
|
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
|
|
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
|
|
|
|
# Embed plot
|
|
self.canvas = FigureCanvas(self.fig)
|
|
plot_layout.addWidget(self.canvas)
|
|
|
|
def init_device(self):
|
|
"""Initialize the ADALM1000 device with continuous measurement"""
|
|
try:
|
|
# Clean up any existing session
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
self.session.end()
|
|
del self.session
|
|
except:
|
|
pass
|
|
|
|
time.sleep(1)
|
|
|
|
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
|
|
if not self.session.devices:
|
|
raise Exception("No ADALM1000 detected - check connections")
|
|
|
|
self.dev = self.session.devices[0]
|
|
# Reset channels
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
self.dev.channels['B'].constant(0)
|
|
|
|
self.session.start(0)
|
|
|
|
self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;")
|
|
self.connection_label.setText("Connected")
|
|
self.status_bar.setText("Device connected | Ready to measure")
|
|
self.session_active = True
|
|
self.start_button.setEnabled(True)
|
|
|
|
# Start measurement thread
|
|
self.start_measurement_thread()
|
|
|
|
except Exception as e:
|
|
self.handle_device_error(e)
|
|
|
|
def start_measurement_thread(self):
|
|
"""Start the continuous measurement thread"""
|
|
if hasattr(self, 'measurement_thread') and self.measurement_thread.isRunning():
|
|
self.measurement_thread.quit()
|
|
self.measurement_thread.wait()
|
|
|
|
self.measurement_thread = QThread()
|
|
self.measurement_worker = MeasurementThread(self.dev, self.interval)
|
|
self.measurement_worker.moveToThread(self.measurement_thread)
|
|
|
|
self.measurement_thread.started.connect(self.measurement_worker.run)
|
|
self.measurement_worker.update_signal.connect(self.update_measurements)
|
|
self.measurement_worker.error_signal.connect(self.handle_device_error)
|
|
|
|
self.measurement_thread.start()
|
|
|
|
@pyqtSlot(float, float, float)
|
|
def update_measurements(self, voltage, current, current_time):
|
|
"""Update measurements from the measurement thread"""
|
|
self.time_data.append(current_time)
|
|
self.voltage_data.append(voltage)
|
|
self.current_data.append(current)
|
|
|
|
# Update display
|
|
self.voltage_label.setText(f"{voltage:.4f}")
|
|
self.current_label.setText(f"{current:.4f}")
|
|
self.time_label.setText(self.format_time(current_time))
|
|
|
|
# Update plot periodically
|
|
if len(self.time_data) % 10 == 0:
|
|
self.update_plot()
|
|
|
|
# Log data if test is running
|
|
if self.test_running and hasattr(self, 'current_cycle_file'):
|
|
self.log_buffer.append([
|
|
f"{current_time:.3f}",
|
|
f"{voltage:.6f}",
|
|
f"{current:.6f}",
|
|
self.test_phase,
|
|
f"{self.capacity_ah:.4f}",
|
|
f"{self.charge_capacity:.4f}",
|
|
f"{self.coulomb_efficiency:.1f}",
|
|
f"{self.cycle_count}"
|
|
])
|
|
|
|
# Write in chunks of 10 samples
|
|
if len(self.log_buffer) >= 10:
|
|
with open(self.filename, 'a', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerows(self.log_buffer)
|
|
self.log_buffer.clear()
|
|
|
|
def start_test(self):
|
|
"""Start the full battery test cycle"""
|
|
if not self.test_running:
|
|
try:
|
|
# Get values from inputs
|
|
capacity = float(self.capacity_input.text())
|
|
charge_cutoff = float(self.charge_cutoff_input.text())
|
|
discharge_cutoff = float(self.discharge_cutoff_input.text())
|
|
c_rate = float(self.c_rate_input.text())
|
|
|
|
# Validate inputs
|
|
if capacity <= 0:
|
|
raise ValueError("Battery capacity must be positive")
|
|
if charge_cutoff <= discharge_cutoff:
|
|
raise ValueError("Charge cutoff must be higher than discharge cutoff")
|
|
if c_rate <= 0:
|
|
raise ValueError("C-rate must be positive")
|
|
|
|
self.continuous_mode = self.continuous_check.isChecked()
|
|
|
|
# Reset timing for new test
|
|
self.measurement_start_time = time.time()
|
|
self.test_start_time = time.time()
|
|
|
|
# Calculate target current
|
|
test_current = c_rate * capacity
|
|
|
|
if test_current > 0.2:
|
|
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
|
|
|
|
# Clear previous data
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
self.phase_data.clear()
|
|
self.capacity_ah = 0.0
|
|
self.charge_capacity = 0.0
|
|
self.coulomb_efficiency = 0.0
|
|
self.cycle_count = 0
|
|
|
|
# Reset plot
|
|
self.reset_plot()
|
|
|
|
# Generate base filename without cycle number
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}")
|
|
self.current_cycle_file = None
|
|
|
|
# Start test
|
|
self.test_running = True
|
|
self.start_time = time.time()
|
|
self.last_update_time = time.time()
|
|
self.test_phase = "Initial Discharge"
|
|
self.phase_label.setText(self.test_phase)
|
|
|
|
self.start_button.setEnabled(False)
|
|
self.stop_button.setEnabled(True)
|
|
self.status_bar.setText(f"Test started | Discharging to {discharge_cutoff}V @ {test_current:.3f}A")
|
|
|
|
# Start test sequence in a new thread
|
|
self.test_thread = threading.Thread(target=self.run_test_sequence, daemon=True)
|
|
self.test_thread.start()
|
|
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", str(e))
|
|
|
|
def create_cycle_log_file(self):
|
|
"""Create a new log file for the current cycle"""
|
|
# Close previous file if exists
|
|
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
|
|
try:
|
|
self.current_cycle_file.close()
|
|
except Exception as e:
|
|
print(f"Error closing previous log file: {e}")
|
|
|
|
# Check write permissions
|
|
if not os.access(self.log_dir, os.W_OK):
|
|
QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}")
|
|
return False
|
|
|
|
# Create new log file with sequential suffix
|
|
suffix = 1
|
|
while True:
|
|
self.filename = f"{self.base_filename}_{suffix}.csv"
|
|
if not os.path.exists(self.filename):
|
|
break
|
|
suffix += 1
|
|
|
|
try:
|
|
self.current_cycle_file = open(self.filename, 'w', newline='')
|
|
self.log_writer = csv.writer(self.current_cycle_file)
|
|
self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase",
|
|
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
|
|
"Coulomb_Eff(%)", "Cycle"])
|
|
self.log_buffer = []
|
|
return True
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", f"Failed to create log file: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def format_time(seconds):
|
|
"""Convert seconds to hh:mm:ss format"""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
seconds = int(seconds % 60)
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
|
|
|
def stop_test(self):
|
|
"""Request immediate stop of the test and clean up all test data"""
|
|
if not self.test_running:
|
|
return
|
|
|
|
self.request_stop = True
|
|
self.test_running = False
|
|
self.measuring = False
|
|
self.test_phase = "Idle"
|
|
self.phase_label.setText(self.test_phase)
|
|
|
|
# Immediately set device to safe state
|
|
if hasattr(self, 'dev'):
|
|
try:
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
except Exception as e:
|
|
print(f"Error resetting device: {e}")
|
|
|
|
# Clear all data buffers
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
self.phase_data.clear()
|
|
|
|
# Reset test values
|
|
self.capacity_ah = 0.0
|
|
self.charge_capacity = 0.0
|
|
self.coulomb_efficiency = 0.0
|
|
|
|
# Reset plot
|
|
self.reset_plot()
|
|
|
|
# Update UI
|
|
self.status_bar.setText("Test stopped - Ready for new test")
|
|
self.stop_button.setEnabled(False)
|
|
self.start_button.setEnabled(True)
|
|
|
|
# Finalize test data (logs, etc.)
|
|
QTimer.singleShot(100, self.finalize_test)
|
|
|
|
def run_test_sequence(self):
|
|
try:
|
|
test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text())
|
|
charge_cutoff = float(self.charge_cutoff_input.text())
|
|
discharge_cutoff = float(self.discharge_cutoff_input.text())
|
|
|
|
while self.test_running and (self.continuous_mode or self.cycle_count == 0):
|
|
# Reset stop request at start of each cycle
|
|
self.request_stop = False
|
|
self.cycle_count += 1
|
|
self.cycle_label.setText(f"{self.cycle_count}")
|
|
|
|
# Create new log file for this cycle
|
|
self.create_cycle_log_file()
|
|
|
|
# 1. Charge phase (constant current)
|
|
self.test_phase = "Charge"
|
|
self.phase_label.setText(self.test_phase)
|
|
self.status_bar.setText(f"Charging to {charge_cutoff}V @ {test_current:.3f}A")
|
|
|
|
self.measuring = True
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].mode = pysmu.Mode.SIMV
|
|
self.dev.channels['A'].constant(test_current)
|
|
self.charge_capacity = 0.0
|
|
self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}")
|
|
target_voltage = charge_cutoff
|
|
self.last_update_time = time.time()
|
|
|
|
while self.test_running and not self.request_stop:
|
|
if not self.voltage_data:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
current_voltage = self.voltage_data[-1]
|
|
measured_current = abs(self.current_data[-1])
|
|
|
|
# Update charge capacity
|
|
now = time.time()
|
|
delta_t = now - self.last_update_time
|
|
self.last_update_time = now
|
|
self.charge_capacity += measured_current * delta_t / 3600
|
|
self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}")
|
|
|
|
self.status_bar.setText(
|
|
f"Charging: {current_voltage:.3f}V / {target_voltage}V | "
|
|
f"Current: {measured_current:.3f}A | "
|
|
f"Capacity: {self.charge_capacity:.4f}Ah"
|
|
)
|
|
|
|
if current_voltage >= target_voltage or self.request_stop:
|
|
break
|
|
|
|
time.sleep(0.1) # More frequent checks
|
|
|
|
if self.request_stop or not self.test_running:
|
|
break
|
|
|
|
# 2. Rest period after charge
|
|
self.test_phase = "Resting (Post-Charge)"
|
|
self.phase_label.setText(self.test_phase)
|
|
self.measuring = False
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
|
|
rest_end_time = time.time() + (float(self.rest_time_input.text()) * 3600)
|
|
while time.time() < rest_end_time and self.test_running and not self.request_stop:
|
|
time_left = max(0, rest_end_time - time.time())
|
|
self.status_bar.setText(
|
|
f"Resting after charge | "
|
|
f"Time left: {time_left/60:.1f} min"
|
|
)
|
|
time.sleep(1) # Check every second for stop request
|
|
|
|
if self.request_stop or not self.test_running:
|
|
break
|
|
|
|
# 3. Discharge phase (capacity measurement)
|
|
self.test_phase = "Discharge"
|
|
self.phase_label.setText(self.test_phase)
|
|
self.status_bar.setText(f"Discharging to {discharge_cutoff}V @ {test_current:.3f}A")
|
|
|
|
self.measuring = True
|
|
self.dev.channels['A'].mode = pysmu.Mode.SIMV
|
|
self.dev.channels['A'].constant(-test_current)
|
|
self.capacity_ah = 0.0
|
|
self.capacity_label.setText(f"{self.capacity_ah:.4f}")
|
|
self.last_update_time = time.time()
|
|
|
|
while self.test_running and not self.request_stop:
|
|
if not self.current_data:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
current_voltage = self.voltage_data[-1]
|
|
current_current = abs(self.current_data[-1])
|
|
|
|
# Capacity calculation
|
|
now = time.time()
|
|
delta_t = now - self.last_update_time
|
|
self.last_update_time = now
|
|
self.capacity_ah += current_current * delta_t / 3600
|
|
self.capacity_label.setText(f"{self.capacity_ah:.4f}")
|
|
|
|
if not self.continuous_check.isChecked() and self.continuous_mode:
|
|
self.continuous_mode = False
|
|
self.status_bar.setText(
|
|
f"Continuous Mode disabled | "
|
|
f"Discharging to {discharge_cutoff}V (will stop after this cycle) | "
|
|
f"Current: {current_current:.3f}A | "
|
|
f"Capacity: {self.capacity_ah:.4f}Ah"
|
|
)
|
|
|
|
else:
|
|
# Default status message
|
|
self.status_bar.setText(
|
|
f"Discharging: {current_voltage:.3f}V / {discharge_cutoff}V | "
|
|
f"Current: {current_current:.3f}A | "
|
|
f"Capacity: {self.capacity_ah:.4f}Ah"
|
|
)
|
|
|
|
if current_voltage <= discharge_cutoff or self.request_stop:
|
|
break
|
|
|
|
if not self.continuous_check.isChecked():
|
|
self.test_running = False
|
|
self.test_phase = "Idle"
|
|
self.phase_label.setText(self.test_phase)
|
|
break # Exit the main test loop
|
|
|
|
# 4. Rest period after discharge (only if not stopping)
|
|
if self.test_running and not self.request_stop:
|
|
self.test_phase = "Resting (Post-Discharge)"
|
|
self.phase_label.setText(self.test_phase)
|
|
self.measuring = False
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
|
|
rest_end_time = time.time() + (float(self.rest_time_input.text()) * 3600)
|
|
while time.time() < rest_end_time and self.test_running and not self.request_stop:
|
|
time_left = max(0, rest_end_time - time.time())
|
|
self.status_bar.setText(
|
|
f"Resting after discharge | "
|
|
f"Time left: {time_left/60:.1f} min"
|
|
)
|
|
time.sleep(1)
|
|
|
|
# Calculate Coulomb efficiency if not stopping
|
|
if not self.request_stop and self.charge_capacity > 0:
|
|
efficiency = (self.capacity_ah / self.charge_capacity) * 100
|
|
self.coulomb_efficiency = efficiency
|
|
self.efficiency_label.setText(f"{self.coulomb_efficiency:.1f}")
|
|
|
|
# Update cycle info
|
|
self.status_bar.setText(
|
|
f"Cycle {self.cycle_count} complete | "
|
|
f"Discharge: {self.capacity_ah:.3f}Ah | "
|
|
f"Charge: {self.charge_capacity:.3f}Ah | "
|
|
f"Efficiency: {self.coulomb_efficiency:.1f}%"
|
|
)
|
|
|
|
# Write cycle summary to log file
|
|
self.write_cycle_summary()
|
|
|
|
# Flush remaining buffer data
|
|
if hasattr(self, 'log_buffer') and self.log_buffer:
|
|
with open(self.filename, 'a', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerows(self.log_buffer)
|
|
self.log_buffer.clear()
|
|
|
|
# Finalize test if stopped or completed
|
|
self.finalize_test()
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
QTimer.singleShot(0, lambda: QMessageBox.critical(self, "Test Error", error_msg))
|
|
self.finalize_test()
|
|
|
|
def finalize_test(self):
|
|
"""Final cleanup after test completes or is stopped"""
|
|
self.measuring = False
|
|
if hasattr(self, 'dev'):
|
|
try:
|
|
self.dev.channels['A'].constant(0)
|
|
except Exception as e:
|
|
print(f"Error resetting device: {e}")
|
|
|
|
# Flush and close current log file
|
|
if hasattr(self, 'log_buffer') and self.log_buffer and hasattr(self, 'log_writer'):
|
|
try:
|
|
self.log_writer.writerows(self.log_buffer)
|
|
self.log_buffer.clear()
|
|
except Exception as e:
|
|
print(f"Error flushing log buffer: {e}")
|
|
|
|
if hasattr(self, 'current_cycle_file'):
|
|
try:
|
|
self.current_cycle_file.close()
|
|
except Exception as e:
|
|
print(f"Error closing log file: {e}")
|
|
|
|
self.start_button.setEnabled(True)
|
|
self.stop_button.setEnabled(False)
|
|
self.request_stop = False
|
|
|
|
message = (
|
|
f"Test safely stopped after discharge phase | "
|
|
f"Cycle {self.cycle_count} completed | "
|
|
f"Final capacity: {self.capacity_ah:.3f}Ah"
|
|
)
|
|
self.status_bar.setText(message)
|
|
|
|
QMessageBox.information(
|
|
self,
|
|
"Test Completed",
|
|
f"Test was safely stopped after discharge phase.\n\n"
|
|
f"Final discharge capacity: {self.capacity_ah:.3f}Ah\n"
|
|
f"Total cycles completed: {self.cycle_count}"
|
|
)
|
|
|
|
def reset_plot(self):
|
|
"""Reset the plot completely for a new test"""
|
|
# Clear the data lines
|
|
self.line_voltage.set_data([], [])
|
|
self.line_current.set_data([], [])
|
|
|
|
# Reset the data buffers
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
|
|
# Set reasonable initial axis ranges
|
|
voltage_padding = 0.2
|
|
min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding)
|
|
max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding
|
|
self.ax.set_xlim(0, 10) # 10s initial range
|
|
self.ax.set_ylim(min_voltage, max_velocity)
|
|
|
|
current_padding = 0.05
|
|
test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text())
|
|
max_current = test_current * 1.5 # 50% padding
|
|
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
|
|
|
|
# Force redraw
|
|
self.canvas.draw()
|
|
|
|
def write_cycle_summary(self):
|
|
"""Write cycle summary to the current cycle's log file"""
|
|
if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file:
|
|
return
|
|
|
|
summary_line = (
|
|
f"Cycle {self.cycle_count} Summary - "
|
|
f"Discharge={self.capacity_ah:.4f}Ah, "
|
|
f"Charge={self.charge_capacity:.4f}Ah, "
|
|
f"Efficiency={self.coulomb_efficiency:.1f}%"
|
|
)
|
|
|
|
# Ensure file is open and write summary
|
|
try:
|
|
if hasattr(self, 'log_buffer') and self.log_buffer:
|
|
self.log_writer.writerows(self.log_buffer)
|
|
self.log_buffer.clear()
|
|
self.current_cycle_file.write(summary_line + "\n")
|
|
self.current_cycle_file.flush()
|
|
except Exception as e:
|
|
print(f"Error writing cycle summary: {e}")
|
|
|
|
def update_plot(self):
|
|
"""Optimized plot update with change detection"""
|
|
if not self.time_data:
|
|
return
|
|
|
|
# Force update more frequently at start of test
|
|
if len(self.time_data) < 10 or (time.time() - getattr(self, '_last_plot_time', 0)) > 1.0:
|
|
self.line_voltage.set_data(self.time_data, self.voltage_data)
|
|
self.line_current.set_data(self.time_data, self.current_data)
|
|
self.auto_scale_axes()
|
|
self.canvas.draw_idle()
|
|
self._last_plot_time = time.time()
|
|
|
|
def auto_scale_axes(self):
|
|
"""Auto-scale plot axes with appropriate padding and strict boundaries"""
|
|
if not self.time_data:
|
|
return
|
|
|
|
# X-axis scaling with 5% padding but don't exceed current max time
|
|
min_time = 0
|
|
max_time = self.time_data[-1]
|
|
current_xlim = self.ax.get_xlim()
|
|
|
|
# Only expand axis if new data exceeds current view
|
|
if max_time > current_xlim[1] * 0.95: # 95% threshold to start expanding
|
|
new_max = max_time * 1.05 # 5% padding
|
|
self.ax.set_xlim(min_time, new_max)
|
|
self.ax2.set_xlim(min_time, new_max)
|
|
|
|
# Voltage axis scaling with strict boundaries
|
|
voltage_padding = 0.2
|
|
if self.voltage_data:
|
|
min_voltage = max(0, min(self.voltage_data) - voltage_padding)
|
|
max_voltage = min(5.0, max(self.voltage_data) + voltage_padding) # 5V hardware limit
|
|
current_ylim = self.ax.get_ylim()
|
|
if (abs(current_ylim[0] - min_voltage) > 0.1 or
|
|
abs(current_ylim[1] - max_voltage) > 0.1):
|
|
self.ax.set_ylim(min_voltage, max_voltage)
|
|
|
|
# Current axis scaling with strict boundaries
|
|
current_padding = 0.05
|
|
if self.current_data:
|
|
min_current = max(-0.25, min(self.current_data) - current_padding) # -250mA limit
|
|
max_current = min(0.25, max(self.current_data) + current_padding) # +250mA limit
|
|
current_ylim2 = self.ax2.get_ylim()
|
|
if (abs(current_ylim2[0] - min_current) > 0.02 or
|
|
abs(current_ylim2[1] - max_current) > 0.02):
|
|
self.ax2.set_ylim(min_current, max_current)
|
|
|
|
def handle_device_error(self, error_msg):
|
|
"""Handle device connection errors"""
|
|
error_msg = str(error_msg)
|
|
print(f"Device error: {error_msg}")
|
|
|
|
# Clean up session first
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
if self.session_active:
|
|
self.session.end()
|
|
del self.session
|
|
except Exception as e:
|
|
print(f"Error cleaning up session: {e}")
|
|
|
|
# Update UI
|
|
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
|
|
self.connection_label.setText("Disconnected")
|
|
self.status_bar.setText(f"Device error: {error_msg}")
|
|
|
|
self.session_active = False
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.measuring = False
|
|
|
|
self.start_button.setEnabled(False)
|
|
self.stop_button.setEnabled(False)
|
|
|
|
# Clear plot + buffers
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'):
|
|
self.line_voltage.set_data([], [])
|
|
self.line_current.set_data([], [])
|
|
self.ax.set_xlim(0, 1)
|
|
self.ax2.set_xlim(0, 1)
|
|
self.canvas.draw()
|
|
|
|
# Show error message and attempt reconnect automatically
|
|
QTimer.singleShot(100, self.attempt_reconnect)
|
|
|
|
def attempt_reconnect(self):
|
|
"""Attempt to reconnect automatically"""
|
|
try:
|
|
# Show error message first
|
|
QMessageBox.critical(
|
|
self,
|
|
"Device Connection Error",
|
|
"Could not connect to ADALM1000\n\n"
|
|
"1. Check USB cable connection\n"
|
|
"2. The device will attempt to reconnect automatically"
|
|
)
|
|
except Exception as e:
|
|
print(f"Error showing message: {e}")
|
|
return
|
|
|
|
# Schedule reconnect attempt
|
|
QTimer.singleShot(1000, self.reconnect_device)
|
|
|
|
def reconnect_device(self):
|
|
"""Reconnect the device with proper cleanup"""
|
|
self.status_bar.setText("Attempting to reconnect...")
|
|
|
|
# Clear any existing session
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
if self.session_active:
|
|
self.session.end()
|
|
del self.session
|
|
except:
|
|
pass
|
|
|
|
# Stop any running threads
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.measuring = False
|
|
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.quit()
|
|
self.measurement_thread.wait()
|
|
|
|
# Add small delay to allow device to reset
|
|
time.sleep(1.5)
|
|
|
|
# Try to initialize device
|
|
try:
|
|
self.init_device()
|
|
if self.session_active:
|
|
self.status_bar.setText("Reconnected successfully")
|
|
return
|
|
except Exception as e:
|
|
print(f"Reconnect failed: {e}")
|
|
|
|
# If we get here, reconnection failed
|
|
self.status_bar.setText("Reconnect failed - will retry...")
|
|
QTimer.singleShot(2000, self.reconnect_device) # Retry after 2 seconds
|
|
|
|
def closeEvent(self, event):
|
|
"""Clean up on window close"""
|
|
# Set flags to stop all threads
|
|
self.test_running = False
|
|
self.measuring = False
|
|
self.session_active = False
|
|
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.quit()
|
|
self.measurement_thread.wait()
|
|
|
|
# Clean up device session
|
|
if hasattr(self, 'session') and self.session:
|
|
try:
|
|
self.session.end()
|
|
except Exception as e:
|
|
print(f"Error ending session: {e}")
|
|
|
|
event.accept()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = QApplication([])
|
|
|
|
# Set application style
|
|
app.setStyle('Fusion')
|
|
|
|
# Create and show main window
|
|
window = BatteryTester()
|
|
window.show()
|
|
|
|
# Run application
|
|
app.exec_() |