Adalm1000_Battery_SMU/MainCode/adalm1000_logger.py
Jan 882f19945c MainCode/adalm1000_logger.py aktualisiert
Key changes made:

    Added os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false' at the beginning to suppress the QStandardPaths warning.

    Added QGridLayout to the imports from PyQt5.QtWidgets.

    Fixed the display layout section to properly use QGridLayout.

    Made sure all necessary Qt classes are imported at the top of the file.
(D)
2025-06-27 16:18:35 +02:00

1033 lines
40 KiB
Python

# -*- coding: utf-8 -*-
import os
import time
import csv
import threading
from datetime import datetime
import numpy as np
from collections import deque
# Suppress QStandardPaths warning
os.environ['QT_LOGGING_RULES'] = 'qt.qpa.*=false'
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QGridLayout, QLabel, QPushButton, QLineEdit, QFrame,
QCheckBox, QMessageBox, QFileDialog)
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject
from PyQt5.QtGui import QColor, QPalette
import pysmu
import matplotlib
matplotlib.use('Qt5Agg')
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
class DeviceDisconnectedError(Exception):
pass
class MeasurementThread(QObject):
update_signal = pyqtSignal(float, float, float)
error_signal = pyqtSignal(str)
def __init__(self, device, interval=0.1):
super().__init__()
self.device = device
self.interval = interval
self.running = False
self.filter_window_size = 10
self.start_time = time.time()
def run(self):
self.running = True
voltage_window = []
current_window = []
while self.running:
try:
samples = self.device.read(self.filter_window_size, 500, True)
if not samples:
raise DeviceDisconnectedError("No samples received")
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
raw_current = np.mean([s[0][1] for s in samples]) # Channel A current
current_time = time.time() - self.start_time
# Apply moving average filter
voltage_window.append(raw_voltage)
current_window.append(raw_current)
if len(voltage_window) > self.filter_window_size:
voltage_window.pop(0)
current_window.pop(0)
voltage = np.mean(voltage_window)
current = np.mean(current_window)
self.update_signal.emit(voltage, current, current_time)
time.sleep(max(0.05, self.interval))
except Exception as e:
self.error_signal.emit(str(e))
break
def stop(self):
self.running = False
class BatteryTester(QMainWindow):
def __init__(self):
super().__init__()
# Color scheme
self.bg_color = QColor(46, 52, 64)
self.fg_color = QColor(216, 222, 233)
self.accent_color = QColor(94, 129, 172)
self.warning_color = QColor(191, 97, 106)
self.success_color = QColor(163, 190, 140)
# Device and measurement state
self.session_active = False
self.measuring = False
self.test_running = False
self.continuous_mode = False
self.request_stop = False
self.interval = 0.1
self.log_dir = os.path.expanduser("~/adalm1000/logs")
os.makedirs(self.log_dir, exist_ok=True)
# Initialize UI
self.setup_ui()
self.init_device()
# Track measurement thread
self.measurement_thread = None
self.measurement_worker = None
# Track test thread
self.test_thread = None
# Data buffers
self.time_data = deque()
self.voltage_data = deque()
self.current_data = deque()
self.phase_data = deque()
def setup_ui(self):
"""Configure the user interface"""
self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)")
self.resize(1000, 800)
self.setMinimumSize(800, 700)
# Set background color
palette = self.palette()
palette.setColor(QPalette.Window, self.bg_color)
palette.setColor(QPalette.WindowText, self.fg_color)
palette.setColor(QPalette.Base, QColor(59, 66, 82))
palette.setColor(QPalette.Text, self.fg_color)
palette.setColor(QPalette.Button, self.accent_color)
palette.setColor(QPalette.ButtonText, self.fg_color)
self.setPalette(palette)
# Main widget and layout
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.main_layout = QVBoxLayout(self.central_widget)
self.main_layout.setContentsMargins(10, 10, 10, 10)
self.main_layout.setSpacing(10)
# Header area
header_frame = QWidget()
header_layout = QHBoxLayout(header_frame)
header_layout.setContentsMargins(0, 0, 0, 0)
self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)")
self.title_label.setStyleSheet(f"font-size: 16px; font-weight: bold; color: {self.accent_color.name()};")
header_layout.addWidget(self.title_label, 1)
# Status indicator
self.connection_label = QLabel("Disconnected")
header_layout.addWidget(self.connection_label)
self.status_light = QLabel()
self.status_light.setFixedSize(20, 20)
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
header_layout.addWidget(self.status_light)
# Reconnect button
self.reconnect_btn = QPushButton("Reconnect")
self.reconnect_btn.clicked.connect(self.reconnect_device)
header_layout.addWidget(self.reconnect_btn)
self.main_layout.addWidget(header_frame)
# Measurement display
display_frame = QFrame()
display_frame.setFrameShape(QFrame.StyledPanel)
display_layout = QGridLayout(display_frame)
measurement_labels = [
("Voltage (V)", "V"),
("Current (A)", "A"),
("Test Phase", ""),
("Elapsed Time", "s"),
("Discharge Capacity", "Ah"),
("Charge Capacity", "Ah"),
("Coulomb Eff.", "%"),
("Cycle Count", ""),
]
for i, (label, unit) in enumerate(measurement_labels):
row = i // 2
col = (i % 2) * 2
lbl = QLabel(f"{label}:")
lbl.setStyleSheet("font-size: 11px;")
display_layout.addWidget(lbl, row, col)
value_label = QLabel("0.000")
value_label.setStyleSheet("font-size: 12px; font-weight: bold;")
display_layout.addWidget(value_label, row, col + 1)
if unit:
unit_label = QLabel(unit)
display_layout.addWidget(unit_label, row, col + 2)
if i == 0:
self.voltage_label = value_label
elif i == 1:
self.current_label = value_label
elif i == 2:
self.phase_label = value_label
elif i == 3:
self.time_label = value_label
elif i == 4:
self.capacity_label = value_label
elif i == 5:
self.charge_capacity_label = value_label
elif i == 6:
self.efficiency_label = value_label
elif i == 7:
self.cycle_label = value_label
self.main_layout.addWidget(display_frame)
# Control area
controls_frame = QWidget()
controls_layout = QHBoxLayout(controls_frame)
controls_layout.setContentsMargins(0, 0, 0, 0)
# Parameters frame
params_frame = QFrame()
params_frame.setFrameShape(QFrame.StyledPanel)
params_layout = QGridLayout(params_frame)
# Battery capacity
params_layout.addWidget(QLabel("Battery Capacity (Ah):"), 0, 0)
self.capacity_input = QLineEdit("0.2")
self.capacity_input.setFixedWidth(80)
params_layout.addWidget(self.capacity_input, 0, 1)
# Charge cutoff
params_layout.addWidget(QLabel("Charge Cutoff (V):"), 1, 0)
self.charge_cutoff_input = QLineEdit("1.43")
self.charge_cutoff_input.setFixedWidth(80)
params_layout.addWidget(self.charge_cutoff_input, 1, 1)
# Discharge cutoff
params_layout.addWidget(QLabel("Discharge Cutoff (V):"), 2, 0)
self.discharge_cutoff_input = QLineEdit("0.9")
self.discharge_cutoff_input.setFixedWidth(80)
params_layout.addWidget(self.discharge_cutoff_input, 2, 1)
# Rest time
params_layout.addWidget(QLabel("Rest Time (hours):"), 3, 0)
self.rest_time_input = QLineEdit("0.25")
self.rest_time_input.setFixedWidth(80)
params_layout.addWidget(self.rest_time_input, 3, 1)
# C-rate for test
params_layout.addWidget(QLabel("Test C-rate:"), 0, 2)
self.c_rate_input = QLineEdit("0.1")
self.c_rate_input.setFixedWidth(60)
params_layout.addWidget(self.c_rate_input, 0, 3)
params_layout.addWidget(QLabel("(e.g., 0.2 for C/5)"), 0, 4)
controls_layout.addWidget(params_frame, 1)
# Button frame
button_frame = QWidget()
button_layout = QVBoxLayout(button_frame)
button_layout.setContentsMargins(0, 0, 0, 0)
self.start_button = QPushButton("START TEST")
self.start_button.clicked.connect(self.start_test)
self.start_button.setStyleSheet(f"background-color: {self.accent_color.name()}; font-weight: bold;")
button_layout.addWidget(self.start_button)
self.stop_button = QPushButton("STOP TEST")
self.stop_button.clicked.connect(self.stop_test)
self.stop_button.setStyleSheet(f"background-color: {self.warning_color.name()}; font-weight: bold;")
self.stop_button.setEnabled(False)
button_layout.addWidget(self.stop_button)
# Continuous mode checkbox
self.continuous_check = QCheckBox("Continuous Mode")
self.continuous_check.setChecked(True)
button_layout.addWidget(self.continuous_check)
controls_layout.addWidget(button_frame)
self.main_layout.addWidget(controls_frame)
# Plot area
self.setup_plot()
self.main_layout.addWidget(self.plot_widget, 1)
# Status bar
self.status_bar = QLabel("Ready")
self.status_bar.setStyleSheet("font-size: 10px; padding: 5px;")
self.main_layout.addWidget(self.status_bar)
# Initialize test phase
self.test_phase = "Idle"
self.phase_label.setText(self.test_phase)
# Initialize capacity and efficiency
self.capacity_ah = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
def setup_plot(self):
"""Configure the matplotlib plot"""
self.plot_widget = QWidget()
plot_layout = QVBoxLayout(self.plot_widget)
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440')
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
self.ax = self.fig.add_subplot(111)
self.ax.set_facecolor('#3B4252')
# Set initial voltage range
voltage_padding = 0.2
min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding)
max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding
self.ax.set_ylim(min_voltage, max_voltage)
# Voltage plot
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
# Current plot (right axis)
self.ax2 = self.ax.twinx()
current_padding = 0.05
test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text())
max_current = test_current * 1.5
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
self.ax2.set_ylabel("Current (A)", color='r')
self.ax2.tick_params(axis='y', labelcolor='r')
self.ax.set_xlabel('Time (s)', color=self.fg_color.name())
self.ax.set_title('Battery Test (CC)', color=self.fg_color.name())
self.ax.tick_params(axis='x', colors=self.fg_color.name())
self.ax.grid(True, color='#4C566A')
# Position legends
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
# Embed plot
self.canvas = FigureCanvas(self.fig)
plot_layout.addWidget(self.canvas)
def init_device(self):
"""Initialize the ADALM1000 device with continuous measurement"""
try:
# Clean up any existing session
if hasattr(self, 'session'):
try:
self.session.end()
del self.session
except:
pass
time.sleep(1)
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
if not self.session.devices:
raise Exception("No ADALM1000 detected - check connections")
self.dev = self.session.devices[0]
# Reset channels
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
self.dev.channels['B'].constant(0)
self.session.start(0)
self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;")
self.connection_label.setText("Connected")
self.status_bar.setText("Device connected | Ready to measure")
self.session_active = True
self.start_button.setEnabled(True)
# Start measurement thread
self.start_measurement_thread()
except Exception as e:
self.handle_device_error(e)
def start_measurement_thread(self):
"""Start the continuous measurement thread"""
if hasattr(self, 'measurement_thread') and self.measurement_thread.isRunning():
self.measurement_thread.quit()
self.measurement_thread.wait()
self.measurement_thread = QThread()
self.measurement_worker = MeasurementThread(self.dev, self.interval)
self.measurement_worker.moveToThread(self.measurement_thread)
self.measurement_thread.started.connect(self.measurement_worker.run)
self.measurement_worker.update_signal.connect(self.update_measurements)
self.measurement_worker.error_signal.connect(self.handle_device_error)
self.measurement_thread.start()
@pyqtSlot(float, float, float)
def update_measurements(self, voltage, current, current_time):
"""Update measurements from the measurement thread"""
self.time_data.append(current_time)
self.voltage_data.append(voltage)
self.current_data.append(current)
# Update display
self.voltage_label.setText(f"{voltage:.4f}")
self.current_label.setText(f"{current:.4f}")
self.time_label.setText(self.format_time(current_time))
# Update plot periodically
if len(self.time_data) % 10 == 0:
self.update_plot()
# Log data if test is running
if self.test_running and hasattr(self, 'current_cycle_file'):
self.log_buffer.append([
f"{current_time:.3f}",
f"{voltage:.6f}",
f"{current:.6f}",
self.test_phase,
f"{self.capacity_ah:.4f}",
f"{self.charge_capacity:.4f}",
f"{self.coulomb_efficiency:.1f}",
f"{self.cycle_count}"
])
# Write in chunks of 10 samples
if len(self.log_buffer) >= 10:
with open(self.filename, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerows(self.log_buffer)
self.log_buffer.clear()
def start_test(self):
"""Start the full battery test cycle"""
if not self.test_running:
try:
# Get values from inputs
capacity = float(self.capacity_input.text())
charge_cutoff = float(self.charge_cutoff_input.text())
discharge_cutoff = float(self.discharge_cutoff_input.text())
c_rate = float(self.c_rate_input.text())
# Validate inputs
if capacity <= 0:
raise ValueError("Battery capacity must be positive")
if charge_cutoff <= discharge_cutoff:
raise ValueError("Charge cutoff must be higher than discharge cutoff")
if c_rate <= 0:
raise ValueError("C-rate must be positive")
self.continuous_mode = self.continuous_check.isChecked()
# Reset timing for new test
self.measurement_start_time = time.time()
self.test_start_time = time.time()
# Calculate target current
test_current = c_rate * capacity
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear previous data
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
self.phase_data.clear()
self.capacity_ah = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
# Reset plot
self.reset_plot()
# Generate base filename without cycle number
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}")
self.current_cycle_file = None
# Start test
self.test_running = True
self.start_time = time.time()
self.last_update_time = time.time()
self.test_phase = "Initial Discharge"
self.phase_label.setText(self.test_phase)
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.status_bar.setText(f"Test started | Discharging to {discharge_cutoff}V @ {test_current:.3f}A")
# Start test sequence in a new thread
self.test_thread = threading.Thread(target=self.run_test_sequence, daemon=True)
self.test_thread.start()
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
def create_cycle_log_file(self):
"""Create a new log file for the current cycle"""
# Close previous file if exists
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
try:
self.current_cycle_file.close()
except Exception as e:
print(f"Error closing previous log file: {e}")
# Check write permissions
if not os.access(self.log_dir, os.W_OK):
QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}")
return False
# Create new log file with sequential suffix
suffix = 1
while True:
self.filename = f"{self.base_filename}_{suffix}.csv"
if not os.path.exists(self.filename):
break
suffix += 1
try:
self.current_cycle_file = open(self.filename, 'w', newline='')
self.log_writer = csv.writer(self.current_cycle_file)
self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase",
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
"Coulomb_Eff(%)", "Cycle"])
self.log_buffer = []
return True
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to create log file: {e}")
return False
@staticmethod
def format_time(seconds):
"""Convert seconds to hh:mm:ss format"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def stop_test(self):
"""Request immediate stop of the test and clean up all test data"""
if not self.test_running:
return
self.request_stop = True
self.test_running = False
self.measuring = False
self.test_phase = "Idle"
self.phase_label.setText(self.test_phase)
# Immediately set device to safe state
if hasattr(self, 'dev'):
try:
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
except Exception as e:
print(f"Error resetting device: {e}")
# Clear all data buffers
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
self.phase_data.clear()
# Reset test values
self.capacity_ah = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
# Reset plot
self.reset_plot()
# Update UI
self.status_bar.setText("Test stopped - Ready for new test")
self.stop_button.setEnabled(False)
self.start_button.setEnabled(True)
# Finalize test data (logs, etc.)
QTimer.singleShot(100, self.finalize_test)
def run_test_sequence(self):
try:
test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text())
charge_cutoff = float(self.charge_cutoff_input.text())
discharge_cutoff = float(self.discharge_cutoff_input.text())
while self.test_running and (self.continuous_mode or self.cycle_count == 0):
# Reset stop request at start of each cycle
self.request_stop = False
self.cycle_count += 1
self.cycle_label.setText(f"{self.cycle_count}")
# Create new log file for this cycle
self.create_cycle_log_file()
# 1. Charge phase (constant current)
self.test_phase = "Charge"
self.phase_label.setText(self.test_phase)
self.status_bar.setText(f"Charging to {charge_cutoff}V @ {test_current:.3f}A")
self.measuring = True
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].mode = pysmu.Mode.SIMV
self.dev.channels['A'].constant(test_current)
self.charge_capacity = 0.0
self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}")
target_voltage = charge_cutoff
self.last_update_time = time.time()
while self.test_running and not self.request_stop:
if not self.voltage_data:
time.sleep(0.1)
continue
current_voltage = self.voltage_data[-1]
measured_current = abs(self.current_data[-1])
# Update charge capacity
now = time.time()
delta_t = now - self.last_update_time
self.last_update_time = now
self.charge_capacity += measured_current * delta_t / 3600
self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}")
self.status_bar.setText(
f"Charging: {current_voltage:.3f}V / {target_voltage}V | "
f"Current: {measured_current:.3f}A | "
f"Capacity: {self.charge_capacity:.4f}Ah"
)
if current_voltage >= target_voltage or self.request_stop:
break
time.sleep(0.1) # More frequent checks
if self.request_stop or not self.test_running:
break
# 2. Rest period after charge
self.test_phase = "Resting (Post-Charge)"
self.phase_label.setText(self.test_phase)
self.measuring = False
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
rest_end_time = time.time() + (float(self.rest_time_input.text()) * 3600)
while time.time() < rest_end_time and self.test_running and not self.request_stop:
time_left = max(0, rest_end_time - time.time())
self.status_bar.setText(
f"Resting after charge | "
f"Time left: {time_left/60:.1f} min"
)
time.sleep(1) # Check every second for stop request
if self.request_stop or not self.test_running:
break
# 3. Discharge phase (capacity measurement)
self.test_phase = "Discharge"
self.phase_label.setText(self.test_phase)
self.status_bar.setText(f"Discharging to {discharge_cutoff}V @ {test_current:.3f}A")
self.measuring = True
self.dev.channels['A'].mode = pysmu.Mode.SIMV
self.dev.channels['A'].constant(-test_current)
self.capacity_ah = 0.0
self.capacity_label.setText(f"{self.capacity_ah:.4f}")
self.last_update_time = time.time()
while self.test_running and not self.request_stop:
if not self.current_data:
time.sleep(0.1)
continue
current_voltage = self.voltage_data[-1]
current_current = abs(self.current_data[-1])
# Capacity calculation
now = time.time()
delta_t = now - self.last_update_time
self.last_update_time = now
self.capacity_ah += current_current * delta_t / 3600
self.capacity_label.setText(f"{self.capacity_ah:.4f}")
if not self.continuous_check.isChecked() and self.continuous_mode:
self.continuous_mode = False
self.status_bar.setText(
f"Continuous Mode disabled | "
f"Discharging to {discharge_cutoff}V (will stop after this cycle) | "
f"Current: {current_current:.3f}A | "
f"Capacity: {self.capacity_ah:.4f}Ah"
)
else:
# Default status message
self.status_bar.setText(
f"Discharging: {current_voltage:.3f}V / {discharge_cutoff}V | "
f"Current: {current_current:.3f}A | "
f"Capacity: {self.capacity_ah:.4f}Ah"
)
if current_voltage <= discharge_cutoff or self.request_stop:
break
if not self.continuous_check.isChecked():
self.test_running = False
self.test_phase = "Idle"
self.phase_label.setText(self.test_phase)
break # Exit the main test loop
# 4. Rest period after discharge (only if not stopping)
if self.test_running and not self.request_stop:
self.test_phase = "Resting (Post-Discharge)"
self.phase_label.setText(self.test_phase)
self.measuring = False
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
rest_end_time = time.time() + (float(self.rest_time_input.text()) * 3600)
while time.time() < rest_end_time and self.test_running and not self.request_stop:
time_left = max(0, rest_end_time - time.time())
self.status_bar.setText(
f"Resting after discharge | "
f"Time left: {time_left/60:.1f} min"
)
time.sleep(1)
# Calculate Coulomb efficiency if not stopping
if not self.request_stop and self.charge_capacity > 0:
efficiency = (self.capacity_ah / self.charge_capacity) * 100
self.coulomb_efficiency = efficiency
self.efficiency_label.setText(f"{self.coulomb_efficiency:.1f}")
# Update cycle info
self.status_bar.setText(
f"Cycle {self.cycle_count} complete | "
f"Discharge: {self.capacity_ah:.3f}Ah | "
f"Charge: {self.charge_capacity:.3f}Ah | "
f"Efficiency: {self.coulomb_efficiency:.1f}%"
)
# Write cycle summary to log file
self.write_cycle_summary()
# Flush remaining buffer data
if hasattr(self, 'log_buffer') and self.log_buffer:
with open(self.filename, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerows(self.log_buffer)
self.log_buffer.clear()
# Finalize test if stopped or completed
self.finalize_test()
except Exception as e:
error_msg = str(e)
QTimer.singleShot(0, lambda: QMessageBox.critical(self, "Test Error", error_msg))
self.finalize_test()
def finalize_test(self):
"""Final cleanup after test completes or is stopped"""
self.measuring = False
if hasattr(self, 'dev'):
try:
self.dev.channels['A'].constant(0)
except Exception as e:
print(f"Error resetting device: {e}")
# Flush and close current log file
if hasattr(self, 'log_buffer') and self.log_buffer and hasattr(self, 'log_writer'):
try:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
except Exception as e:
print(f"Error flushing log buffer: {e}")
if hasattr(self, 'current_cycle_file'):
try:
self.current_cycle_file.close()
except Exception as e:
print(f"Error closing log file: {e}")
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.request_stop = False
message = (
f"Test safely stopped after discharge phase | "
f"Cycle {self.cycle_count} completed | "
f"Final capacity: {self.capacity_ah:.3f}Ah"
)
self.status_bar.setText(message)
QMessageBox.information(
self,
"Test Completed",
f"Test was safely stopped after discharge phase.\n\n"
f"Final discharge capacity: {self.capacity_ah:.3f}Ah\n"
f"Total cycles completed: {self.cycle_count}"
)
def reset_plot(self):
"""Reset the plot completely for a new test"""
# Clear the data lines
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
# Reset the data buffers
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
# Set reasonable initial axis ranges
voltage_padding = 0.2
min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding)
max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding
self.ax.set_xlim(0, 10) # 10s initial range
self.ax.set_ylim(min_voltage, max_velocity)
current_padding = 0.05
test_current = float(self.c_rate_input.text()) * float(self.capacity_input.text())
max_current = test_current * 1.5 # 50% padding
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
# Force redraw
self.canvas.draw()
def write_cycle_summary(self):
"""Write cycle summary to the current cycle's log file"""
if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file:
return
summary_line = (
f"Cycle {self.cycle_count} Summary - "
f"Discharge={self.capacity_ah:.4f}Ah, "
f"Charge={self.charge_capacity:.4f}Ah, "
f"Efficiency={self.coulomb_efficiency:.1f}%"
)
# Ensure file is open and write summary
try:
if hasattr(self, 'log_buffer') and self.log_buffer:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
self.current_cycle_file.write(summary_line + "\n")
self.current_cycle_file.flush()
except Exception as e:
print(f"Error writing cycle summary: {e}")
def update_plot(self):
"""Optimized plot update with change detection"""
if not self.time_data:
return
# Force update more frequently at start of test
if len(self.time_data) < 10 or (time.time() - getattr(self, '_last_plot_time', 0)) > 1.0:
self.line_voltage.set_data(self.time_data, self.voltage_data)
self.line_current.set_data(self.time_data, self.current_data)
self.auto_scale_axes()
self.canvas.draw_idle()
self._last_plot_time = time.time()
def auto_scale_axes(self):
"""Auto-scale plot axes with appropriate padding and strict boundaries"""
if not self.time_data:
return
# X-axis scaling with 5% padding but don't exceed current max time
min_time = 0
max_time = self.time_data[-1]
current_xlim = self.ax.get_xlim()
# Only expand axis if new data exceeds current view
if max_time > current_xlim[1] * 0.95: # 95% threshold to start expanding
new_max = max_time * 1.05 # 5% padding
self.ax.set_xlim(min_time, new_max)
self.ax2.set_xlim(min_time, new_max)
# Voltage axis scaling with strict boundaries
voltage_padding = 0.2
if self.voltage_data:
min_voltage = max(0, min(self.voltage_data) - voltage_padding)
max_voltage = min(5.0, max(self.voltage_data) + voltage_padding) # 5V hardware limit
current_ylim = self.ax.get_ylim()
if (abs(current_ylim[0] - min_voltage) > 0.1 or
abs(current_ylim[1] - max_voltage) > 0.1):
self.ax.set_ylim(min_voltage, max_voltage)
# Current axis scaling with strict boundaries
current_padding = 0.05
if self.current_data:
min_current = max(-0.25, min(self.current_data) - current_padding) # -250mA limit
max_current = min(0.25, max(self.current_data) + current_padding) # +250mA limit
current_ylim2 = self.ax2.get_ylim()
if (abs(current_ylim2[0] - min_current) > 0.02 or
abs(current_ylim2[1] - max_current) > 0.02):
self.ax2.set_ylim(min_current, max_current)
def handle_device_error(self, error_msg):
"""Handle device connection errors"""
error_msg = str(error_msg)
print(f"Device error: {error_msg}")
# Clean up session first
if hasattr(self, 'session'):
try:
if self.session_active:
self.session.end()
del self.session
except Exception as e:
print(f"Error cleaning up session: {e}")
# Update UI
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
self.connection_label.setText("Disconnected")
self.status_bar.setText(f"Device error: {error_msg}")
self.session_active = False
self.test_running = False
self.continuous_mode = False
self.measuring = False
self.start_button.setEnabled(False)
self.stop_button.setEnabled(False)
# Clear plot + buffers
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'):
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
self.ax.set_xlim(0, 1)
self.ax2.set_xlim(0, 1)
self.canvas.draw()
# Show error message and attempt reconnect automatically
QTimer.singleShot(100, self.attempt_reconnect)
def attempt_reconnect(self):
"""Attempt to reconnect automatically"""
try:
# Show error message first
QMessageBox.critical(
self,
"Device Connection Error",
"Could not connect to ADALM1000\n\n"
"1. Check USB cable connection\n"
"2. The device will attempt to reconnect automatically"
)
except Exception as e:
print(f"Error showing message: {e}")
return
# Schedule reconnect attempt
QTimer.singleShot(1000, self.reconnect_device)
def reconnect_device(self):
"""Reconnect the device with proper cleanup"""
self.status_bar.setText("Attempting to reconnect...")
# Clear any existing session
if hasattr(self, 'session'):
try:
if self.session_active:
self.session.end()
del self.session
except:
pass
# Stop any running threads
self.test_running = False
self.continuous_mode = False
self.measuring = False
if hasattr(self, 'measurement_thread'):
self.measurement_thread.quit()
self.measurement_thread.wait()
# Add small delay to allow device to reset
time.sleep(1.5)
# Try to initialize device
try:
self.init_device()
if self.session_active:
self.status_bar.setText("Reconnected successfully")
return
except Exception as e:
print(f"Reconnect failed: {e}")
# If we get here, reconnection failed
self.status_bar.setText("Reconnect failed - will retry...")
QTimer.singleShot(2000, self.reconnect_device) # Retry after 2 seconds
def closeEvent(self, event):
"""Clean up on window close"""
# Set flags to stop all threads
self.test_running = False
self.measuring = False
self.session_active = False
if hasattr(self, 'measurement_thread'):
self.measurement_thread.quit()
self.measurement_thread.wait()
# Clean up device session
if hasattr(self, 'session') and self.session:
try:
self.session.end()
except Exception as e:
print(f"Error ending session: {e}")
event.accept()
if __name__ == "__main__":
app = QApplication([])
# Set application style
app.setStyle('Fusion')
# Create and show main window
window = BatteryTester()
window.show()
# Run application
app.exec_()