Adalm1000_Battery_SMU/MainCode/adalm1000_logger.py
2025-06-30 16:01:19 +02:00

1280 lines
52 KiB
Python

# -*- coding: utf-8 -*-
import os
import time
import csv
from datetime import datetime
import numpy as np
# Suppress warnings
os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false'
os.environ['LIBUSB_DEBUG'] = '0'
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QGridLayout, QLabel, QPushButton, QLineEdit, QFrame,
QCheckBox, QMessageBox, QFileDialog, QProgressBar)
from PyQt5.QtCore import (Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread,
QMutex, QMutexLocker)
from PyQt5.QtGui import QColor, QPalette
from collections import deque
import pysmu
import matplotlib
matplotlib.use('Qt5Agg')
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
class DeviceDisconnectedError(Exception):
"""Custom exception for device disconnection events."""
pass
class MeasurementThread(QThread):
"""Thread for continuous measurement of voltage and current."""
update_signal = pyqtSignal(float, float, float) # voltage, current, timestamp
error_signal = pyqtSignal(str)
status_signal = pyqtSignal(str)
def __init__(self, device: object, interval: float = 0.1, start_time: float = None):
"""Initialize measurement thread."""
super().__init__()
self._mutex = QMutex()
self.data_mutex = QMutex()
self.device = device
self.interval = max(0.05, interval)
self._running = False
self.filter_window_size = 10
self.start_time = start_time if start_time else time.time()
self.last_update_time = self.start_time
# Configure channels
self.device.channels['A'].mode = pysmu.Mode.SIMV # Channel A for current
self.device.channels['B'].mode = pysmu.Mode.HI_Z # Channel B for voltage
def run(self):
"""Measurement loop for both voltage and current."""
self._running = True
voltage_window = deque()
current_window = deque()
self.status_signal.emit("Measurement started")
self.last_update_time = time.time()
while self._running:
try:
samples = self.device.read(self.filter_window_size, timeout=500)
if not samples or len(samples) < self.filter_window_size:
continue
with QMutexLocker(self._mutex):
# Get voltage from Channel B and current from Channel A
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
raw_current = np.mean([s[0][1] for s in samples]) # Channel A current
# Apply moving average filter
if len(voltage_window) >= self.filter_window_size:
voltage_window.popleft()
voltage_window.append(raw_voltage)
if len(current_window) >= self.filter_window_size:
current_window.popleft()
current_window.append(raw_current)
voltage = np.mean(voltage_window)
current = np.mean(current_window)
current_time = time.time() - self.start_time
self.update_signal.emit(voltage, current, current_time)
elapsed = time.time() - self.last_update_time
sleep_time = max(0.01, self.interval - elapsed)
time.sleep(sleep_time)
self.last_update_time = time.time()
except Exception as e:
self.error_signal.emit(f"Measurement error: {str(e)}")
break
self.status_signal.emit("Measurement stopped")
self._running = False
def stop(self):
"""Safe thread termination with timeout."""
self._running = False
if self.isRunning():
self.quit()
if not self.wait(300): # 300ms grace period
self.terminate()
def is_active(self) -> bool:
"""Check if thread is running and updating."""
with QMutexLocker(self._mutex):
return self._running and (time.time() - self.last_update_time < 2.0)
class TestSequenceThread(QThread):
"""Thread for executing battery test sequences."""
progress_updated = pyqtSignal(float, str) # progress, phase
cycle_completed = pyqtSignal(int, float, float, float) # cycle, discharge, charge, efficiency
error_occurred = pyqtSignal(str)
def __init__(self, parent: QObject):
"""Initialize test sequence thread."""
super().__init__(parent) # Pass parent to QThread
self.parent = parent
self._mutex = QMutex() # For thread operation control
self.data_mutex = QMutex() # For data protection
self._running = False
def run(self):
"""Execute the complete test sequence with configurable modes."""
self._running = True
try:
test_current = float(self.parent.c_rate_input.text()) * float(self.parent.capacity_input.text())
charge_cutoff = float(self.parent.charge_cutoff_input.text())
discharge_cutoff = float(self.parent.discharge_cutoff_input.text())
cv_cutoff_current = float(self.parent.cv_cutoff_input.text()) # Add this input field
while self._running and (self.parent.continuous_mode or self.parent.cycle_count == 0):
with QMutexLocker(self._mutex):
if self.parent.request_stop:
break
self.parent.cycle_count += 1
cycle = self.parent.cycle_count
# CC Charge phase
self._execute_phase("charge", test_current, charge_cutoff,
discharge_cutoff, charge_cutoff)
if not self._running:
break
# Optional CV Charge phase
if self.parent.cv_mode_enabled: # Add checkbox in UI
self._execute_cv_charge(charge_cutoff, test_current, cv_cutoff_current)
if not self._running:
break
# Rest after charge
self._execute_rest("post-charge")
if not self._running:
break
# Discharge phase (CC)
self._execute_phase("discharge", test_current, discharge_cutoff,
discharge_cutoff, charge_cutoff)
if not self.parent.continuous_mode:
break
# Rest after discharge
if self._running:
self._execute_rest("post-discharge")
# Calculate efficiency
if self.parent.charge_capacity > 0:
efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100
self.cycle_completed.emit(cycle, self.parent.capacity_ah,
self.parent.charge_capacity, efficiency)
except Exception as e:
self.error_occurred.emit(str(e))
finally:
self._running = False
def _execute_cv_charge(self, target_voltage: float, current_limit: float, cutoff_current: float):
"""Execute constant voltage charge phase."""
try:
self.progress_updated.emit(0.0, "CV Charge")
# Configure for CV mode
self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV
self.parent.dev.channels['A'].constant(target_voltage)
start_time = time.time()
last_update = start_time
while self._running:
with QMutexLocker(self._mutex):
if self.parent.request_stop:
break
if not self.parent.test_data['voltage']:
time.sleep(0.1)
continue
current_voltage = self.parent.test_data['voltage'][-1]
current_current = abs(self.parent.test_data['current'][-1])
current_time = time.time()
delta_t = current_time - last_update
last_update = current_time
# Update charge capacity
self.parent.charge_capacity += current_current * delta_t / 3600
# Calculate progress based on current
progress = 1 - (current_current / current_limit)
progress = max(0.0, min(1.0, progress))
self.progress_updated.emit(progress, "CV Charge")
# Check termination condition
if current_current <= cutoff_current:
break
time.sleep(0.1)
except Exception as e:
self.error_occurred.emit(f"CV Charge error: {str(e)}")
raise
finally:
self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z
def _execute_phase(self, phase: str, current: float, target_voltage: float, discharge_cutoff: float, charge_cutoff: float):
try:
print(f"\n=== Starting {phase} phase ===")
print(f"Device session active: {self.parent.session_active}")
print(f"Channel A mode before: {self.parent.dev.channels['A'].mode}")
# Reset channel first with longer delay
self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.parent.dev.channels['A'].constant(0)
time.sleep(1.0) # Increased settling time
# Configure for current mode with explicit setup
self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV
time.sleep(0.5) # Additional delay for mode change
# Set current with proper polarity
if phase == "charge":
print(f"Starting CHARGE at {current}A to {target_voltage}V")
self.parent.dev.channels['A'].constant(current)
else:
print(f"Starting DISCHARGE at {-current}A to {target_voltage}V")
self.parent.dev.channels['A'].constant(-current)
time.sleep(1.0) # Allow more settling time
print(f"Set constant current to: {current} A on channel A")
# Verify current setting with more samples
samples = self.parent.dev.read(50, timeout=2000) # More samples, longer timeout
measured_current = np.mean([s[0][1] for s in samples])
print(f"Requested {current}A, Measured {measured_current:.6f}A")
# Additional debug info
print(f"Channel A mode: {self.parent.dev.channels['A'].mode}")
if abs(measured_current) < 0.001: # If current is still near zero
print("Warning: Current not being applied - checking connection")
# Try resetting the device session
self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z
time.sleep(0.5)
self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV
time.sleep(0.5)
self.parent.dev.channels['A'].constant(current if phase == "charge" else -current)
time.sleep(1.0)
# Re-measure
samples = self.parent.dev.read(50, timeout=2000)
measured_current = np.mean([s[0][1] for s in samples])
print(f"After reset: Requested {current}A, Measured {measured_current:.6f}A")
if abs(measured_current) < 0.001: # Still no current
raise RuntimeError(f"Failed to apply {current}A - check device connection and battery")
# Main phase loop
start_time = time.time()
last_update = start_time
while self._running:
with QMutexLocker(self._mutex):
if self.parent.request_stop:
break
if not self.parent.test_data['voltage']:
time.sleep(0.1)
continue
current_voltage = self.parent.test_data['voltage'][-1]
current_time = time.time()
delta_t = current_time - last_update
last_update = current_time
# Update capacity
if phase == "charge":
self.parent.charge_capacity += abs(self.parent.test_data['current'][-1]) * delta_t / 3600
progress = (current_voltage - discharge_cutoff) / (target_voltage - discharge_cutoff)
else:
self.parent.capacity_ah += abs(self.parent.test_data['current'][-1]) * delta_t / 3600
progress = (charge_cutoff - current_voltage) / (charge_cutoff - target_voltage)
progress = max(0.0, min(1.0, progress))
self.progress_updated.emit(progress, f"{phase.capitalize()}ing")
# Check termination conditions
if ((phase == "charge" and current_voltage >= target_voltage) or
(phase == "discharge" and current_voltage <= target_voltage)):
break
time.sleep(0.1)
except AttributeError as e:
if '_constant' in str(e):
print("Warning: Internal attribute check failed, but current setting should still work")
# Continue with the test
else:
self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}")
raise
except Exception as e:
self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}")
raise
finally:
# Ensure channel is reset even if error occurs
try:
self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.parent.dev.channels['A'].constant(0)
except:
pass # Best effort cleanup
class BatteryTester(QMainWindow):
"""Main application window for battery capacity testing."""
error_signal = pyqtSignal(str)
def __init__(self):
"""Initialize the battery tester application with enhanced features."""
super().__init__()
self.error_signal.connect(self.handle_device_error)
# Initialize thread-safety objects
self._mutex = QMutex() # For general thread safety
self.data_mutex = QMutex() # For test data protection
# Enhanced data structure for flexible mode support
self.test_data = {
'time': deque(),
'voltage': deque(),
'current': deque(),
'mode': deque(), # Tracks operation mode (CC, CV, etc.)
'phase': deque(), # Tracks charge/discharge/rest
'capacity': deque(),
'energy': deque()
}
# Test control variables
self.test_phase = "Ready"
self.current_mode = "None" # Tracks current operation mode
self.capacity_ah = 0.0 # Discharge capacity
self.charge_capacity = 0.0 # Charge capacity
self.energy_wh = 0.0 # Energy in watt-hours
self.coulomb_efficiency = 0.0
self.cycle_count = 0
self.cycle_data = [] # Stores cycle statistics
# Color scheme
self.bg_color = QColor(46, 52, 64)
self.fg_color = QColor(216, 222, 233)
self.accent_color = QColor(94, 129, 172)
self.warning_color = QColor(191, 97, 106)
self.success_color = QColor(163, 190, 140)
self.cv_color = QColor(143, 188, 187) # Additional color for CV mode
# Device and test status
self.session_active = False
self.measuring = False
self.test_running = False
self.continuous_mode = False
self.cv_mode_enabled = False # CV charge mode flag
self.request_stop = False
self.interval = 0.1 # Measurement interval
# Logging configuration
self.log_dir = os.path.expanduser("~/adalm1000/logs")
os.makedirs(self.log_dir, exist_ok=True)
self.start_time = time.time()
self.log_buffer = []
self.current_cycle_file = None
# Thread management
self.measurement_thread = None
self.test_thread = None
# Initialize UI with all controls
self._setup_ui()
# Initialize device with delay to avoid USB issues
QTimer.singleShot(100, self.safe_init_device)
def _setup_ui(self):
"""Configure the user interface."""
self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)")
self.resize(1000, 800)
self.setMinimumSize(800, 700)
# Set color palette
palette = self.palette()
palette.setColor(QPalette.Window, self.bg_color)
palette.setColor(QPalette.WindowText, self.fg_color)
palette.setColor(QPalette.Base, QColor(59, 66, 82))
palette.setColor(QPalette.Text, self.fg_color)
palette.setColor(QPalette.Button, self.accent_color)
palette.setColor(QPalette.ButtonText, self.fg_color)
self.setPalette(palette)
# Main widget and layout
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.main_layout = QVBoxLayout(self.central_widget)
self.main_layout.setContentsMargins(10, 10, 10, 10)
self.main_layout.setSpacing(10)
# Header section
header_frame = QWidget()
header_layout = QHBoxLayout(header_frame)
header_layout.setContentsMargins(0, 0, 0, 0)
self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)")
self.title_label.setStyleSheet(f"font-size: 16px; font-weight: bold; color: {self.accent_color.name()};")
header_layout.addWidget(self.title_label, 1)
# Connection indicator
self.connection_label = QLabel("Disconnected")
header_layout.addWidget(self.connection_label)
self.status_light = QLabel()
self.status_light.setFixedSize(20, 20)
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
header_layout.addWidget(self.status_light)
# Reconnect button
self.reconnect_btn = QPushButton("Connect")
self.reconnect_btn.clicked.connect(self.reconnect_device)
header_layout.addWidget(self.reconnect_btn)
self.main_layout.addWidget(header_frame)
# Measurement display
display_frame = QFrame()
display_frame.setFrameShape(QFrame.StyledPanel)
display_layout = QGridLayout(display_frame)
display_layout.setHorizontalSpacing(15)
display_layout.setVerticalSpacing(8)
measurement_labels = [
("Voltage", "V"),
("Current", "A"),
("Test Phase", ""),
("Elapsed Time", "s"),
("Discharge Capacity", "Ah"),
("Charge Capacity", "Ah"),
("Coulomb Efficiency", "%"),
("Cycle Count", ""),
]
self.value_labels = {} # Optional: Store labels for easy update
for i, (label, unit) in enumerate(measurement_labels):
row = i // 2
col = (i % 2) * 3 # Each block is 3 columns: label | value | unit
name_label = QLabel(label + ":")
name_label.setStyleSheet("font-size: 11px;")
display_layout.addWidget(name_label, row, col)
value_label = QLabel("0.000")
value_label.setStyleSheet("font-size: 12px; font-weight: bold; min-width: 60px;")
display_layout.addWidget(value_label, row, col + 1)
unit_label = QLabel(unit)
unit_label.setStyleSheet("font-size: 11px; color: gray;")
display_layout.addWidget(unit_label, row, col + 2)
# Save reference for updating later
self.value_labels[label] = value_label
# Assign to instance attributes for specific fields
if label == "Voltage":
self.voltage_label = value_label
elif label == "Current":
self.current_label = value_label
elif label == "Test Phase":
self.phase_label = value_label
elif label == "Elapsed Time":
self.time_label = value_label
elif label == "Discharge Capacity":
self.capacity_label = value_label
elif label == "Charge Capacity":
self.charge_capacity_label = value_label
elif label == "Coulomb Efficiency":
self.efficiency_label = value_label
elif label == "Cycle Count":
self.cycle_label = value_label
self.main_layout.addWidget(display_frame)
# Progress bar
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 100)
self.progress_bar.setTextVisible(False)
self.progress_bar.setStyleSheet(f"""
QProgressBar {{
border: 1px solid {self.fg_color.name()};
border-radius: 5px;
text-align: center;
}}
QProgressBar::chunk {{
background-color: {self.accent_color.name()};
}}
""")
self.main_layout.addWidget(self.progress_bar)
# Control section
controls_frame = QWidget()
controls_layout = QHBoxLayout(controls_frame)
controls_layout.setContentsMargins(0, 0, 0, 0)
# Parameters frame
params_frame = QFrame()
params_frame.setFrameShape(QFrame.StyledPanel)
params_layout = QGridLayout(params_frame)
# Add CV cutoff current input
params_layout.addWidget(QLabel("CV Cutoff Current (A):"), 4, 0)
self.cv_cutoff_input = QLineEdit("0.02") # 20mA default
self.cv_cutoff_input.setFixedWidth(80)
self.cv_cutoff_input.setToolTip("Current at which CV charging should stop")
params_layout.addWidget(self.cv_cutoff_input, 4, 1)
# Battery capacity
params_layout.addWidget(QLabel("Battery Capacity (Ah):"), 0, 0)
self.capacity_input = QLineEdit("0.2")
self.capacity_input.setFixedWidth(80)
self.capacity_input.setToolTip("Nominal capacity of the battery in Amp-hours")
params_layout.addWidget(self.capacity_input, 0, 1)
# Charge cutoff voltage
params_layout.addWidget(QLabel("Charge Cutoff (V):"), 1, 0)
self.charge_cutoff_input = QLineEdit("1.43")
self.charge_cutoff_input.setFixedWidth(80)
self.charge_cutoff_input.setToolTip("Voltage at which charging should stop")
params_layout.addWidget(self.charge_cutoff_input, 1, 1)
# Discharge cutoff voltage
params_layout.addWidget(QLabel("Discharge Cutoff (V):"), 2, 0)
self.discharge_cutoff_input = QLineEdit("0.9")
self.discharge_cutoff_input.setFixedWidth(80)
self.discharge_cutoff_input.setToolTip("Voltage at which discharging should stop")
params_layout.addWidget(self.discharge_cutoff_input, 2, 1)
# Rest time
params_layout.addWidget(QLabel("Rest Time (hours):"), 3, 0)
self.rest_time_input = QLineEdit("0.25")
self.rest_time_input.setFixedWidth(80)
self.rest_time_input.setToolTip("Rest period between charge/discharge cycles")
params_layout.addWidget(self.rest_time_input, 3, 1)
# C-Rate for test
params_layout.addWidget(QLabel("Test C-Rate:"), 0, 2)
self.c_rate_input = QLineEdit("0.1")
self.c_rate_input.setFixedWidth(60)
self.c_rate_input.setToolTip("Charge/discharge rate relative to battery capacity (e.g., 0.2 for C/5)")
params_layout.addWidget(self.c_rate_input, 0, 3)
params_layout.addWidget(QLabel("(e.g., 0.2 for C/5)"), 0, 4)
controls_layout.addWidget(params_frame, 1)
# Button area
button_frame = QWidget()
button_layout = QVBoxLayout(button_frame)
button_layout.setContentsMargins(0, 0, 0, 0)
# Add CV mode checkbox and cutoff current input
self.cv_mode_check = QCheckBox("Enable CV Charge")
self.cv_mode_check.setChecked(False)
self.cv_mode_check.setToolTip("Enable constant voltage charge phase after CC charge")
button_layout.addWidget(self.cv_mode_check)
self.start_button = QPushButton("START TEST")
self.start_button.clicked.connect(self.start_test)
self.start_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.accent_color.name()};
font-weight: bold;
padding: 8px;
border-radius: 5px;
}}
QPushButton:disabled {{
background-color: #4C566A;
}}
""")
self.start_button.setEnabled(False)
button_layout.addWidget(self.start_button)
self.stop_button = QPushButton("STOP TEST")
self.stop_button.clicked.connect(self.stop_test)
self.stop_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.warning_color.name()};
font-weight: bold;
padding: 8px;
border-radius: 5px;
}}
QPushButton:disabled {{
background-color: #4C566A;
}}
""")
self.stop_button.setEnabled(False)
button_layout.addWidget(self.stop_button)
# Continuous mode checkbox
self.continuous_check = QCheckBox("Continuous Mode")
self.continuous_check.setChecked(True)
self.continuous_check.setToolTip("Run multiple charge/discharge cycles until stopped")
button_layout.addWidget(self.continuous_check)
button_layout.addWidget(self.cv_mode_check)
controls_layout.addWidget(button_frame)
self.main_layout.addWidget(controls_frame)
# Plot area
self._setup_plot()
self.main_layout.addWidget(self.plot_widget, 1)
# Status bar
self.status_bar = QLabel("Ready")
self.status_bar.setStyleSheet("font-size: 10px; padding: 5px;")
self.main_layout.addWidget(self.status_bar)
def _setup_plot(self):
"""Configure the matplotlib plot."""
self.plot_widget = QWidget()
plot_layout = QVBoxLayout(self.plot_widget)
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440')
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
self.ax = self.fig.add_subplot(111)
self.ax.set_facecolor('#3B4252')
# Initial voltage range
voltage_padding = 0.2
min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding)
max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding
self.ax.set_ylim(min_voltage, max_voltage)
# Voltage plot
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
# Current plot (right axis)
self.ax2 = self.ax.twinx()
current_padding = 0.05
test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text())
max_current = test_current * 1.5
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
self.ax2.set_ylabel("Current (A)", color='r')
self.ax2.tick_params(axis='y', labelcolor='r')
self.ax.set_xlabel('Time (s)', color=self.fg_color.name())
self.ax.set_title('Battery Test (CC)', color=self.fg_color.name())
self.ax.tick_params(axis='x', colors=self.fg_color.name())
self.ax.grid(True, color='#4C566A')
# Position legends
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
# Embed plot
self.canvas = FigureCanvas(self.fig)
plot_layout.addWidget(self.canvas)
def safe_init_device(self):
"""Safe device initialization with error handling."""
try:
self.init_device()
except Exception as e:
self.handle_device_error(str(e))
def init_device(self):
"""Initialize the ADALM1000 device."""
# Temporarily enable USB debugging
os.environ['LIBUSB_DEBUG'] = '3' # Set to 0 in production
self.cleanup_device()
try:
print("Waiting before initializing session...")
time.sleep(1.5) # Delay helps avoid "device busy" issues
self.session = pysmu.Session()
# 🔍 Log detected devices
print(f"Devices found: {self.session.devices}")
if not self.session.devices:
raise Exception("No ADALM1000 detected - check USB connection")
self.dev = self.session.devices[0]
# Reset channels
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
self.dev.channels['B'].constant(0)
self.session.start(0)
# Update UI
self.status_light.setStyleSheet("background-color: green; border-radius: 10px;")
self.connection_label.setText("Connected")
self.status_bar.setText("Device connected | Ready for measurement")
self.session_active = True
self.start_button.setEnabled(True)
# Start measurement thread
self.start_measurement_thread()
except Exception as e:
raise Exception(f"Device initialization failed: {str(e)}")
def cleanup_device(self):
"""Clean up device resources efficiently with essential safeguards."""
print("Cleaning up device session...")
# Stop threads with timeout
for thread in [self.measurement_thread, getattr(self, 'test_thread', None)]:
if thread and thread.isRunning():
try:
thread.stop()
if not thread.wait(800): # Reduced timeout
thread.terminate()
print(f"Warning: {thread.__class__.__name__} required termination")
except Exception as e:
print(f"Error stopping {thread.__class__.__name__}: {str(e)}")
# Clean up session if it exists
if getattr(self, 'session', None):
try:
if self.session_active:
# Quick channel reset if device exists
if hasattr(self, 'dev'):
try:
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
except:
pass # Best effort cleanup
self.session.end()
del self.session
except Exception as e:
print(f"Session cleanup error: {str(e)}")
finally:
self.session_active = False
# Reset all states and UI
self.measuring = False
self.test_running = False
self.request_stop = True
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
self.connection_label.setText("Disconnected")
self.start_button.setEnabled(False)
self.stop_button.setEnabled(False)
def start_measurement_thread(self):
"""Start the measurement thread."""
if not hasattr(self, 'dev') or self.dev is None:
print("Device not initialized, cannot start measurement thread")
return
if self.measurement_thread is not None:
self.measurement_thread.stop()
self.measurement_thread.wait(500)
self.measurement_thread = MeasurementThread(
device=self.dev,
interval=self.interval,
start_time=self.start_time
)
self.measurement_thread.update_signal.connect(self.update_measurements)
self.measurement_thread.error_signal.connect(self.handle_device_error)
self.measurement_thread.start()
def reconnect_device(self):
"""Attempt to reconnect the device."""
self.status_bar.setText("Attempting to reconnect...")
self.cleanup_device()
QTimer.singleShot(2000, self.safe_init_device) # Retry with delay
def handle_device_error(self, error_msg):
"""Thread-safe device error handling."""
try:
# Ensure this runs in the main thread
if QThread.currentThread() != self.thread():
self.error_signal.emit(str(error_msg))
return
print(f"Device error: {error_msg}")
def update_ui():
self.status_bar.setText(f"Device error: {error_msg}")
if self.isVisible():
QMessageBox.critical(
self,
"Device Error",
f"Device error occurred:\n{error_msg}\n\n"
"1. Check USB connection\n"
"2. Try manual reconnect\n"
"3. Restart application if problems persist"
)
QTimer.singleShot(0, lambda: (
self.cleanup_device(),
update_ui()
))
except Exception as e:
print(f"Error in error handler: {str(e)}")
try:
self.cleanup_device()
except:
pass
def start_test(self):
"""Start the complete battery test cycle with proper file initialization."""
# Check if test is already running
if self.test_running:
return
# Verify thread safety objects exist
if not hasattr(self, 'data_mutex') or self.data_mutex is None:
QMessageBox.critical(self, "Error", "Thread safety not initialized")
return
try:
with QMutexLocker(self.data_mutex):
# Get and validate input values with error handling
try:
capacity = float(self.capacity_input.text())
charge_cutoff = float(self.charge_cutoff_input.text())
discharge_cutoff = float(self.discharge_cutoff_input.text())
c_rate = float(self.c_rate_input.text())
cv_cutoff = float(self.cv_cutoff_input.text())
if capacity <= 0:
raise ValueError("Battery capacity must be positive")
if charge_cutoff <= discharge_cutoff:
raise ValueError("Charge cutoff must be higher than discharge cutoff")
if c_rate <= 0:
raise ValueError("C-rate must be positive")
test_current = c_rate * capacity
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
except ValueError as e:
QMessageBox.critical(self, "Input Error", str(e))
return
# Reset test state and data structures
try:
for key in self.test_data:
self.test_data[key].clear()
self.capacity_ah = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
except Exception as e:
QMessageBox.critical(self, "Data Error", f"Couldn't reset test data: {str(e)}")
return
# Initialize timing
self.start_time = time.time()
self.time_label.setText("00:00:00")
self.reset_plot()
# Initialize log file with error handling
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.filename = os.path.join(self.log_dir, f"battery_test_{timestamp}.csv")
with open(self.filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow([
'Time(s)', 'Voltage(V)', 'Current(A)', 'Mode',
'Phase', 'Discharge(Ah)', 'Charge(Ah)', 'Efficiency(%)',
'Cycle'
])
self.current_cycle_file = open(self.filename, 'a', newline='')
self.log_writer = csv.writer(self.current_cycle_file)
self.log_buffer = []
except Exception as e:
QMessageBox.critical(
self,
"File Error",
f"Failed to initialize log file:\n{str(e)}\n\n"
f"Check directory permissions:\n{self.log_dir}"
)
return
# Start test with thread safety
try:
self.test_running = True
self.request_stop = False
self.test_phase = "Initializing"
self.current_mode = "CC"
self.continuous_mode = self.continuous_check.isChecked()
self.cv_mode_enabled = self.cv_mode_check.isChecked()
# UI updates
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.status_bar.setText(
f"Test started | Target: {discharge_cutoff}V @ {test_current:.3f}A | "
f"CV Mode: {'ON' if self.cv_mode_enabled else 'OFF'}"
)
# Start test thread with cleanup guard
if self.test_thread and self.test_thread.isRunning():
self.test_thread.stop()
self.test_thread = TestSequenceThread(self)
self.test_thread.progress_updated.connect(self.update_test_progress)
self.test_thread.cycle_completed.connect(self.update_cycle_stats)
self.test_thread.error_occurred.connect(self.handle_device_error)
self.test_thread.finished.connect(self.finalize_test)
self.test_thread.start()
except Exception as e:
self.finalize_test(show_message=False)
QMessageBox.critical(
self,
"Test Error",
f"Failed to start test thread:\n{str(e)}"
)
except Exception as e:
self.finalize_test(show_message=False)
QMessageBox.critical(
self,
"Critical Error",
f"Unexpected error in test initialization:\n{str(e)}"
)
def update_test_progress(self, progress: float, phase: str):
"""Update test progress and phase display."""
self.test_phase = phase
self.phase_label.setText(phase)
self.progress_bar.setValue(int(progress * 100))
def update_cycle_stats(self, cycle: int, discharge: float, charge: float, efficiency: float):
"""Update cycle statistics."""
self.cycle_count = cycle
self.capacity_ah = discharge
self.charge_capacity = charge
self.coulomb_efficiency = efficiency
self.cycle_label.setText(f"{cycle}")
self.capacity_label.setText(f"{discharge:.4f}")
self.charge_capacity_label.setText(f"{charge:.4f}")
self.efficiency_label.setText(f"{efficiency:.1f}")
self.status_bar.setText(
f"Cycle {cycle} completed | "
f"Discharge: {discharge:.3f}Ah | "
f"Charge: {charge:.3f}Ah | "
f"Efficiency: {efficiency:.1f}%"
)
# Write cycle summary
self.write_cycle_summary()
def stop_test(self):
"""Safely stop the running test."""
if not self.test_running:
return
self.request_stop = True
self.test_running = False
self.measuring = False
self.test_phase = "Ready"
self.phase_label.setText(self.test_phase)
if hasattr(self, 'dev'):
try:
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
except Exception as e:
print(f"Error resetting device: {e}")
# Update UI
self.status_bar.setText("Test stopped - Ready for new test")
self.stop_button.setEnabled(False)
self.start_button.setEnabled(True)
self.finalize_test(show_message=False)
def finalize_test(self, show_message: bool = True):
"""Final cleanup after test completion with robust error handling."""
# Set up error tracking
errors = []
try:
# 1. Handle log buffer writing
if getattr(self, 'log_writer', None) is not None:
try:
if getattr(self, 'log_buffer', None):
try:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
except Exception as e:
errors.append(f"Failed to write log buffer: {str(e)}")
except Exception as e:
errors.append(f"Log buffer access error: {str(e)}")
# 2. Handle file closure
if getattr(self, 'current_cycle_file', None) is not None:
try:
file = self.current_cycle_file
if not file.closed:
try:
file.flush()
os.fsync(file.fileno())
except Exception as e:
errors.append(f"Failed to sync file: {str(e)}")
finally:
file.close()
except Exception as e:
errors.append(f"File closure error: {str(e)}")
finally:
if hasattr(self, 'current_cycle_file'):
del self.current_cycle_file
if hasattr(self, 'log_writer'):
del self.log_writer
# 3. Clean up thread references
try:
if hasattr(self, 'test_thread') and self.test_thread is not None:
if self.test_thread.isRunning():
self.test_thread.stop()
if not self.test_thread.wait(500): # 500ms timeout
errors.append("Test thread didn't stop cleanly")
self.test_thread = None
except Exception as e:
errors.append(f"Thread cleanup error: {str(e)}")
# 4. Reset test state
try:
self.test_running = False
self.request_stop = True
self.measuring = False
self.stop_button.setEnabled(False)
self.start_button.setEnabled(True)
except Exception as e:
errors.append(f"State reset error: {str(e)}")
# 5. Show completion message if requested
if show_message and self.isVisible():
try:
msg = "Test completed"
if errors:
msg += f"\n\nMinor issues occurred:\n- " + "\n- ".join(errors)
QMessageBox.information(
self,
"Test Complete",
f"{msg}\nCycles: {self.cycle_count}",
QMessageBox.Ok
)
except Exception as e:
print(f"Failed to show completion message: {str(e)}")
except Exception as e:
# Catastrophic error handling
print(f"Critical error in finalize_test: {str(e)}")
try:
if hasattr(self, 'current_cycle_file'):
try:
self.current_cycle_file.close()
except:
pass
except:
pass
# Ensure basic state is reset
self.test_running = False
self.request_stop = True
self.measuring = False
finally:
# Final cleanup guarantees
try:
if hasattr(self, 'log_buffer'):
self.log_buffer.clear()
except:
pass
def update_measurements(self, voltage: float, current: float, current_time: float):
"""Update measurements with enhanced data structure."""
if not hasattr(self, '_mutex'):
print("Critical Error: Mutex not initialized")
return
try:
with QMutexLocker(self._mutex):
# Limit data points
max_points = 10000
for key in self.test_data:
if len(self.test_data[key]) > max_points:
self.test_data[key].popleft()
# Store data
self.test_data['time'].append(current_time)
self.test_data['voltage'].append(voltage)
self.test_data['current'].append(current)
self.test_data['phase'].append(self.test_phase)
self.test_data['mode'].append(self.current_mode)
# Update UI
self.voltage_label.setText(f"{voltage:.4f}")
self.current_label.setText(f"{current:.4f}")
self.time_label.setText(self.format_time(current_time))
# Update plot periodically
if len(self.test_data['time']) % 10 == 0:
self.update_plot()
# Log data if test is running
if self.test_running and hasattr(self, 'log_writer'):
self.log_buffer.append([
f"{current_time:.3f}",
f"{voltage:.6f}",
f"{current:.6f}",
self.test_phase,
f"{self.capacity_ah:.4f}",
f"{self.charge_capacity:.4f}",
f"{self.coulomb_efficiency:.1f}",
f"{self.cycle_count}"
])
# Write data in blocks
if len(self.log_buffer) >= 10:
try:
self.log_writer.writerows(self.log_buffer)
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
self.current_cycle_file.flush()
self.log_buffer.clear()
except Exception as e:
print(f"Log write error: {e}")
except Exception as e:
print(f"Error in update_measurements: {str(e)}")
def write_cycle_summary(self):
"""Write cycle summary to the current cycle's log file"""
if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file:
return
summary_line = (
f"Cycle {self.cycle_count.get()} Summary - "
f"Discharge={self.capacity_ah.get():.4f}Ah, "
f"Charge={self.charge_capacity.get():.4f}Ah, "
f"Efficiency={self.coulomb_efficiency.get():.1f}%"
)
# Ensure file is open and write summary
try:
if self.log_buffer:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
self.current_cycle_file.write(summary_line + "\n")
self.current_cycle_file.flush()
except Exception as e:
print(f"Error writing cycle summary: {e}")
def update_plot(self):
"""Update the plot with new data using test_data structure."""
if not self.test_data['time']:
return
self.line_voltage.set_data(
list(self.test_data['time']),
list(self.test_data['voltage'])
)
self.line_current.set_data(
list(self.test_data['time']),
list(self.test_data['current'])
)
self.auto_scale_axes()
self.canvas.draw_idle()
def auto_scale_axes(self):
"""Automatically adjust plot axes using test_data."""
if not self.test_data['time']:
return
# X-axis adjustment
max_time = list(self.test_data['time'])[-1]
current_xlim = self.ax.get_xlim()
if max_time > current_xlim[1] * 0.95:
new_xmax = max_time * 1.10 # 10% padding
self.ax.set_xlim(0, new_xmax)
self.ax2.set_xlim(0, new_xmax)
# Y-axes adjustment
if self.test_data['voltage']:
voltage_padding = 0.2
min_v = max(0, min(list(self.test_data['voltage'])) - voltage_padding)
max_v = min(5.0, max(list(self.test_data['voltage'])) + voltage_padding)
self.ax.set_ylim(min_v, max_v)
if self.test_data['current']:
current_padding = 0.05
min_c = max(-0.25, min(list(self.test_data['current'])) - current_padding)
max_c = min(0.25, max(list(self.test_data['current'])) + current_padding)
self.ax2.set_ylim(min_c, max_c)
@staticmethod
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS.
Args:
seconds: Time in seconds
Returns:
Formatted time string
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def reset_plot(self):
"""Reset the plot to initial state."""
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
# Reset axes to starting values
self.ax.set_xlim(0, 10)
voltage_padding = 0.2
min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding)
max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding
self.ax.set_ylim(min_voltage, max_voltage)
self.canvas.draw()
def closeEvent(self, event):
"""Clean up when closing the window."""
self.cleanup_device()
event.accept()
if __name__ == "__main__":
app = QApplication([])
app.setStyle('Fusion')
window = BatteryTester()
window.show()
app.exec_()