Adalm1000_Battery_SMU/MainCode/adalm1000_logger.py
2025-08-07 01:59:33 +02:00

2663 lines
111 KiB
Python

# -*- coding: utf-8 -*-
import os
import time
import csv
import threading
import traceback
from datetime import datetime
import numpy as np
import matplotlib
matplotlib.use('Qt5Agg')
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
from collections import deque
from queue import Queue, Full, Empty
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QGridLayout, QLabel, QPushButton, QLineEdit, QCheckBox,
QFrame, QMessageBox, QFileDialog, QComboBox)
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread
from PyQt5.QtGui import QDoubleValidator
from PyQt5 import sip
import pysmu
class DeviceManager:
def __init__(self, dev):
self.dev = dev
self.serial = dev.serial
self.measurement_thread = None
self.is_running = False
# Datenpuffer
max_data_points = 36000
self.time_data = deque(maxlen=max_data_points)
self.voltage_data = deque(maxlen=max_data_points)
self.current_data = deque(maxlen=max_data_points)
self.display_time_data = deque(maxlen=10000)
self.display_voltage_data = deque(maxlen=10000)
self.display_current_data = deque(maxlen=10000)
# Testzustand
self.capacity_ah = 0.0
self.energy = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
self.test_phase = "Idle"
self.start_time = time.time()
# Logging
self.current_cycle_file = None
self.log_writer = None
# Downsampling
self.downsample_factor = 1
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0
}
self.consecutive_read_errors = 0 # Track read failures
self.max_consecutive_errors = 5 # Threshold before reset
def handle_read_error(self, increment=1):
"""Handle read errors with automatic reset"""
self.consecutive_read_errors += increment
if self.consecutive_read_errors >= self.max_consecutive_errors:
try:
print("Resetting device due to persistent errors")
self.dev.session.end()
time.sleep(0.5)
self.dev.session = PatchedSession()
self.dev.session.start(0)
return True # Recovery attempted
except Exception as e:
print(f"Device reset failed: {e}")
return False # Unrecoverable error
return True # Under threshold - continue
def start_measurement(self, interval=0.1):
self.stop_measurement() # Ensure any existing thread is stopped
self.measurement_thread = MeasurementThread(self.dev, interval, self)
self.measurement_thread.start()
self.is_running = True
def stop_measurement(self):
if self.measurement_thread:
try:
# Disconnect signals first
try:
self.measurement_thread.update_signal.disconnect()
self.measurement_thread.error_signal.disconnect()
except (RuntimeError, TypeError):
pass
# Then stop the thread
self.measurement_thread.stop()
if not self.measurement_thread.wait(500):
self.measurement_thread.terminate()
self.measurement_thread = None
except Exception as e:
print(f"Error stopping measurement thread: {e}")
self.is_running = False
def reset_data(self):
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
self.display_time_data.clear()
self.display_voltage_data.clear()
self.display_current_data.clear()
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0
}
self.capacity_ah = 0.0
self.energy = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
self.start_time = time.time()
self.test_phase = "Idle"
class DeviceDisconnectedError(Exception):
pass
class MeasurementThread(QThread):
update_signal = pyqtSignal(float, float, float)
error_signal = pyqtSignal(str)
def __init__(self, device, interval, parent_manager):
super().__init__()
self.device = device
self.interval = interval
self._running = False
self.filter_window_size = 10
self.voltage_window = []
self.current_window = []
self.start_time = None
self.measurement_queue = Queue(maxsize=1)
self.current_direction = 1
self.parent_manager = parent_manager
def stop(self):
self._running = False
if not self.wait(500): # Wait up to 500ms for clean shutdown
self.terminate()
def run(self):
"""Continuous measurement loop with enhanced error handling"""
self._running = True
self.start_time = time.time() # This gets reset when mode changes
consecutive_errors = 0
max_consecutive_errors = 5
while self._running:
try:
samples = self.device.read(self.filter_window_size, 500, True)
# --- Handle empty samples ---
if not samples:
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
# Attempt device reset through parent manager
if hasattr(self, 'parent_manager'):
if not self.parent_manager.handle_read_error():
raise DeviceDisconnectedError("Persistent read failures")
consecutive_errors = 0 # Reset after handling
time.sleep(0.1)
continue
# Reset error counter on successful read
consecutive_errors = 0
# --- Process samples ---
current_time = time.time() - self.start_time
# Get voltage from Channel B (HI_Z mode) and current from Channel A
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
raw_current = np.mean([s[0][1] for s in samples]) * self.current_direction # Channel A current with direction
# Update filter windows
self.voltage_window.append(raw_voltage)
self.current_window.append(raw_current)
if len(self.voltage_window) > self.filter_window_size:
self.voltage_window.pop(0)
self.current_window.pop(0)
voltage = np.mean(self.voltage_window)
current = np.mean(self.current_window)
# Validate measurements before processing
if not (0.0 <= voltage <= 5.0): # ADALM1000 voltage range
raise ValueError(f"Voltage out of range: {voltage:.4f}V")
if not (-0.25 <= current <= 0.25): # ADALM1000 current range
raise ValueError(f"Current out of range: {current:.4f}A")
# Emit update
self.update_signal.emit(voltage, current, current_time)
# Store measurement
try:
self.measurement_queue.put_nowait((voltage, current))
except Full:
pass # It's OK to skip if queue is full
# Adaptive sleep based on interval
time.sleep(max(0.05, self.interval))
except DeviceDisconnectedError as e:
self.error_signal.emit(f"Device disconnected: {str(e)}")
break
except ValueError as e:
# Skip invalid measurements but log first occurrence
if consecutive_errors == 0:
self.error_signal.emit(f"Measurement error: {str(e)}")
consecutive_errors += 1
time.sleep(0.1)
except Exception as e:
self.error_signal.emit(f"Read error: {str(e)}")
consecutive_errors += 1
time.sleep(1)
# Handle persistent errors
if consecutive_errors >= max_consecutive_errors:
if hasattr(self, 'parent_manager'):
if not self.parent_manager.handle_read_error():
self.error_signal.emit("Critical error - stopping measurement")
break
consecutive_errors = 0
def set_direction(self, direction):
"""Set current direction (1 for source, -1 for sink)"""
self.current_direction = direction
class TestSequenceWorker(QObject):
finished = pyqtSignal()
update_phase = pyqtSignal(str)
update_status = pyqtSignal(str)
test_completed = pyqtSignal()
error_occurred = pyqtSignal(str)
def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent):
super().__init__()
self.device = device
self.test_current = test_current
self.charge_cutoff = charge_cutoff
self.discharge_cutoff = discharge_cutoff
self.rest_time = rest_time * 3600 # Convert hours to seconds
self.continuous_mode = continuous_mode
self.parent = parent
self._running = True
self.voltage_timeout = 0.5 # seconds
def get_latest_measurement(self):
"""Thread-safe measurement reading with timeout"""
try:
return self.parent.measurement_thread.measurement_queue.get(
timeout=self.voltage_timeout
)
except Empty:
return (None, None) # Return tuple for unpacking
def charge_phase(self):
"""Handle the battery charging phase"""
self.update_phase.emit("Charge")
self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.4f}A")
try:
# Configure channels - Channel A sources current, Channel B measures voltage
self.device.channels['B'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].mode = pysmu.Mode.SIMV
self.device.channels['A'].constant(self.test_current)
self.parent.measurement_thread.set_direction(1) # Source current
# Small delay to allow current to stabilize
time.sleep(0.1)
while self._running:
voltage, current = self.get_latest_measurement()
if voltage is None:
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if voltage >= self.charge_cutoff:
break
time.sleep(0.1)
finally:
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
def discharge_phase(self):
"""Handle the battery discharging phase"""
voltage, _ = self.get_latest_measurement()
if voltage is not None and voltage <= self.discharge_cutoff:
self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)")
return
self.update_phase.emit("Discharge")
self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A")
try:
# Configure channels - Channel A sinks current, Channel B measures voltage
self.device.channels['B'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].mode = pysmu.Mode.SIMV
self.device.channels['A'].constant(-self.test_current)
self.parent.measurement_thread.set_direction(-1) # Sink current
# Small delay to allow current to stabilize
time.sleep(0.1)
while self._running:
voltage, current = self.get_latest_measurement()
if voltage is None:
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if voltage <= self.discharge_cutoff:
break
time.sleep(0.1)
finally:
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
def rest_phase(self, phase_name):
"""Handle rest period between phases"""
self.update_phase.emit(f"Resting ({phase_name})")
rest_end = time.time() + self.rest_time
while time.time() < rest_end and self._running:
time_left = max(0, rest_end - time.time())
self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min")
time.sleep(1)
def stop(self):
"""Request the thread to stop"""
self._running = False
try:
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
self.device.channels['B'].mode = pysmu.Mode.HI_Z
except Exception as e:
print(f"Error stopping device: {e}")
def run(self):
"""Main test sequence loop"""
try:
first_cycle = True # Ensure at least one cycle runs
while (self._running and
(self.parent.continuous_mode_check.isChecked() or first_cycle)):
self.parent.request_stop = False
self.parent.cycle_count += 1
first_cycle = False # Only True for the first cycle
# 1. Charge phase (constant current)
self.charge_phase()
if not self._running or self.parent.request_stop:
break
# 2. Rest period after charge
self.rest_phase("Post-Charge")
if not self._running or self.parent.request_stop:
break
# 3. Discharge phase (capacity measurement)
self.discharge_phase()
if not self._running or self.parent.request_stop:
break
# 4. Rest period after discharge (only if not stopping)
if self._running and not self.parent.request_stop:
self.rest_phase("Post-Discharge")
# Calculate Coulomb efficiency if not stopping
if not self.parent.request_stop and self.parent.charge_capacity > 0:
self.parent.coulomb_efficiency = (
self.parent.capacity_ah / self.parent.charge_capacity
) * 100
# Test completed
self.test_completed.emit()
except Exception as e:
self.error_occurred.emit(f"Test sequence error: {str(e)}")
finally:
self.finished.emit()
class DischargeWorker(QObject):
finished = pyqtSignal()
update_status = pyqtSignal(str)
test_completed = pyqtSignal()
error_occurred = pyqtSignal(str)
def __init__(self, device, test_current, discharge_cutoff, parent):
super().__init__()
self.device = device
self.test_current = test_current
self.discharge_cutoff = discharge_cutoff
self.parent = parent
self._running = True
self.voltage_timeout = 0.5 # seconds
def get_latest_measurement(self):
"""Thread-safe measurement reading with timeout"""
try:
return self.parent.measurement_thread.measurement_queue.get(
timeout=self.voltage_timeout
)
except Empty:
return (None, None) # Return tuple for unpacking
def discharge_phase(self):
"""Handle the battery discharging phase"""
voltage, _ = self.get_latest_measurement()
if voltage is not None and voltage <= self.discharge_cutoff:
self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)")
return
self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A")
try:
# Configure channels - Channel A sinks current, Channel B measures voltage
self.device.channels['B'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].mode = pysmu.Mode.SIMV
self.device.channels['A'].constant(-self.test_current)
self.parent.measurement_thread.set_direction(-1) # Sink current
# Small delay to allow current to stabilize
time.sleep(0.1)
while self._running:
voltage, current = self.get_latest_measurement()
if voltage is None:
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if voltage <= self.discharge_cutoff:
break
time.sleep(0.1)
finally:
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
def stop(self):
"""Request the thread to stop"""
self._running = False
try:
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
self.device.channels['B'].mode = pysmu.Mode.HI_Z
except Exception as e:
print(f"Error stopping device: {e}")
def run(self):
"""Main discharge sequence"""
try:
self.parent.request_stop = False
self.parent.cycle_count = 1 # Only one discharge cycle
# Discharge phase
self.discharge_phase()
if not self._running or self.parent.request_stop:
return
# Test completed
self.test_completed.emit()
except Exception as e:
self.error_occurred.emit(f"Discharge error: {str(e)}")
finally:
self.finished.emit()
class ChargeWorker(QObject):
finished = pyqtSignal()
update_status = pyqtSignal(str)
test_completed = pyqtSignal()
error_occurred = pyqtSignal(str)
def __init__(self, device, test_current, charge_cutoff, parent):
super().__init__()
self.device = device
self.test_current = test_current
self.charge_cutoff = charge_cutoff
self.parent = parent
self._running = True
def run(self):
"""Main charge sequence"""
try:
self.parent.measurement_thread.set_direction(1) # Source current
# Configure channels - Channel A sources current, Channel B measures voltage
self.device.channels['B'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].mode = pysmu.Mode.SIMV
self.device.channels['A'].constant(self.test_current)
time.sleep(0.1) # Allow current to stabilize
while self._running:
voltage, current = self.parent.get_latest_measurement()
if voltage is None:
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if voltage >= self.charge_cutoff:
break
time.sleep(0.1)
self.test_completed.emit()
except Exception as e:
self.error_occurred.emit(f"Charge error: {str(e)}")
finally:
self.device.channels['A'].constant(0)
self.finished.emit()
def stop(self):
"""Request the thread to stop"""
self._running = False
try:
self.device.channels['A'].constant(0)
except Exception as e:
print(f"Error stopping charge: {e}")
class BatteryTester(QMainWindow):
def __init__(self):
self.plot_mutex = threading.Lock()
super().__init__()
self.devices = {} # Dictionary DeviceManager-Instanzen
self.active_device = None
self.last_logged_phase = None
# Color scheme
self.bg_color = "#2E3440"
self.fg_color = "#D8DEE9"
self.accent_color = "#5E81AC"
self.warning_color = "#BF616A"
self.success_color = "#A3BE8C"
# Device and measurement state
self.session_active = False
self.measuring = False
self.test_running = False
self.continuous_mode = False
self.request_stop = False
self.interval = 0.1
self.log_dir = os.path.expanduser("~/adalm1000/logs")
os.makedirs(self.log_dir, exist_ok=True)
# Data buffers
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [],
'count': 0, 'last_plot_time': 0
}
self.phase_data = deque()
self.downsample_factor = 1 # Initial kein Downsampling
self.downsample_counter = 0
# Initialize all measurement variables
self.capacity_ah = 0.0
self.energy = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
self.start_time = time.time()
self.last_update_time = self.start_time
# Initialize UI and device
self.setup_ui()
self.init_device()
# Set window properties
self.setWindowTitle("ADALM1000 - Battery Tester (Multi-Mode)")
self.resize(1000, 800)
self.setMinimumSize(800, 700)
# Status update timer
self.status_timer = QTimer()
self.status_timer.timeout.connect(self.update_status_and_plot)
self.status_timer.start(1000) #every second
def setup_ui(self):
"""Configure the user interface with all elements properly organized"""
# Main widget and layout
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.main_layout = QVBoxLayout(self.central_widget)
self.main_layout.setContentsMargins(10, 10, 10, 10)
# Mode and device selection frame
mode_frame = QFrame()
mode_frame.setFrameShape(QFrame.StyledPanel)
mode_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
mode_layout = QHBoxLayout(mode_frame)
# Test mode selection
self.mode_label = QLabel("Test Mode:")
self.mode_label.setStyleSheet(f"color: {self.fg_color};")
mode_layout.addWidget(self.mode_label)
self.mode_combo = QComboBox()
self.mode_combo.addItems(["Live Monitoring", "Discharge Test", "Charge Test", "Cycle Test"])
self.mode_combo.setStyleSheet(f"""
QComboBox {{
background-color: #3B4252;
color: {self.fg_color};
border: 1px solid #4C566A;
border-radius: 3px;
padding: 2px;
}}
""")
self.mode_combo.currentTextChanged.connect(self.change_mode)
mode_layout.addWidget(self.mode_combo, 1)
# Device selection
self.device_label = QLabel("Device:")
self.device_label.setStyleSheet(f"color: {self.fg_color};")
mode_layout.addWidget(self.device_label)
self.device_combo = QComboBox()
self.device_combo.setStyleSheet(f"""
QComboBox {{
background-color: #3B4252;
color: {self.fg_color};
border: 1px solid #4C566A;
border-radius: 3px;
padding: 2px;
}}
""")
self.device_combo.currentIndexChanged.connect(self.change_device)
mode_layout.addWidget(self.device_combo, 1)
self.main_layout.addWidget(mode_frame)
# Header area
header_frame = QFrame()
header_frame.setFrameShape(QFrame.NoFrame)
header_layout = QHBoxLayout(header_frame)
header_layout.setContentsMargins(0, 0, 0, 0)
self.title_label = QLabel("ADALM1000 Battery Tester")
self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};")
header_layout.addWidget(self.title_label, 1)
# Status indicator
self.status_light = QLabel()
self.status_light.setFixedSize(20, 20)
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
header_layout.addWidget(self.status_light)
self.connection_label = QLabel("Disconnected")
header_layout.addWidget(self.connection_label)
# Reconnect button
self.reconnect_btn = QPushButton("Reconnect")
self.reconnect_btn.clicked.connect(self.reconnect_device)
header_layout.addWidget(self.reconnect_btn)
self.main_layout.addWidget(header_frame)
# Measurement display
display_frame = QFrame()
display_frame.setFrameShape(QFrame.StyledPanel)
display_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
display_layout = QGridLayout(display_frame)
# Measurement values
measurement_labels = [
("Voltage", "V"), ("Current", "A"), ("Test Phase", ""),
("Elapsed Time", "s"), ("Capacity", "Ah"), ("Power", "W"),
("Energy", "Wh"), ("Cycle Count", ""), ("Battery Temp", "°C")
]
for i, (label, unit) in enumerate(measurement_labels):
row = i // 3
col = (i % 3) * 3
lbl = QLabel(f"{label}:")
lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
display_layout.addWidget(lbl, row, col)
value_lbl = QLabel("0.000")
value_lbl.setStyleSheet(f"""
color: {self.fg_color};
font-weight: bold;
font-size: 12px;
min-width: 60px;
""")
display_layout.addWidget(value_lbl, row, col + 1)
if unit:
unit_lbl = QLabel(unit)
unit_lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
display_layout.addWidget(unit_lbl, row, col + 2)
for i in range(9):
display_layout.setColumnStretch(i, 1 if i % 3 == 1 else 0)
self.voltage_label = display_layout.itemAtPosition(0, 1).widget()
self.current_label = display_layout.itemAtPosition(0, 4).widget()
self.phase_label = display_layout.itemAtPosition(0, 7).widget()
self.time_label = display_layout.itemAtPosition(1, 1).widget()
self.capacity_label = display_layout.itemAtPosition(1, 4).widget()
self.power_label = display_layout.itemAtPosition(1, 7).widget()
self.energy_label = display_layout.itemAtPosition(2, 1).widget()
self.cycle_label = display_layout.itemAtPosition(2, 4).widget()
self.temp_label = display_layout.itemAtPosition(2, 7).widget()
self.main_layout.addWidget(display_frame)
# Control area
controls_frame = QFrame()
controls_frame.setFrameShape(QFrame.NoFrame)
controls_layout = QHBoxLayout(controls_frame)
controls_layout.setContentsMargins(0, 0, 0, 0)
# Parameters frame
self.params_frame = QFrame()
self.params_frame.setFrameShape(QFrame.StyledPanel)
self.params_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
self.params_layout = QGridLayout(self.params_frame)
# Add parameter inputs
row = 0
# Battery Capacity
self.capacity_label = QLabel("Capacity (Ah):")
self.capacity_label.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.capacity_label, row, 0)
self.capacity_input = QLineEdit("1.0")
self.capacity_input.setValidator(QDoubleValidator(0.001, 100, 3))
self.params_layout.addWidget(self.capacity_input, row, 1)
row += 1
# C-Rate
self.c_rate_label = QLabel("C-Rate:")
self.c_rate_label.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.c_rate_label, row, 0)
self.c_rate_input = QLineEdit("0.1")
self.c_rate_input.setValidator(QDoubleValidator(0.01, 1, 2))
self.params_layout.addWidget(self.c_rate_input, row, 1)
row += 1
# Charge Cutoff Voltage
self.charge_cutoff_label = QLabel("Charge Cutoff (V):")
self.charge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.charge_cutoff_label, row, 0)
self.charge_cutoff_input = QLineEdit("1.43")
self.charge_cutoff_input.setValidator(QDoubleValidator(0.1, 5.0, 3))
self.params_layout.addWidget(self.charge_cutoff_input, row, 1)
row += 1
# Discharge Cutoff Voltage
self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):")
self.discharge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.discharge_cutoff_label, row, 0)
self.discharge_cutoff_input = QLineEdit("0.01")
self.discharge_cutoff_input.setValidator(QDoubleValidator(0.1, 5.0, 3))
self.params_layout.addWidget(self.discharge_cutoff_input, row, 1)
row += 1
# Rest Time
self.rest_time_label = QLabel("Rest Time (h):")
self.rest_time_label.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.rest_time_label, row, 0)
self.rest_time_input = QLineEdit("0.5")
self.rest_time_input.setValidator(QDoubleValidator(0.1, 24, 1))
self.params_layout.addWidget(self.rest_time_input, row, 1)
row += 1
# Test Conditions
self.test_conditions_label = QLabel("Test Conditions:")
self.test_conditions_label.setStyleSheet(f"color: {self.fg_color};")
self.params_layout.addWidget(self.test_conditions_label, row, 0)
self.test_conditions_input = QLineEdit("Room Temperature")
self.params_layout.addWidget(self.test_conditions_input, row, 1)
controls_layout.addWidget(self.params_frame, 1)
# Button frame with single toggle button
button_frame = QFrame()
button_frame.setFrameShape(QFrame.NoFrame)
button_layout = QVBoxLayout(button_frame)
button_layout.setContentsMargins(0, 0, 0, 0)
# Single toggle button (Start/Stop)
self.toggle_button = QPushButton("START")
self.toggle_button.setCheckable(True)
self.toggle_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.accent_color};
color: {self.fg_color};
font-weight: bold;
padding: 6px;
border-radius: 4px;
min-width: 80px;
}}
QPushButton:checked {{
background-color: {self.warning_color};
}}
QPushButton:disabled {{
background-color: #4C566A;
color: #D8DEE9;
}}
""")
self.toggle_button.clicked.connect(self.toggle_test)
button_layout.addWidget(self.toggle_button)
# Continuous mode checkbox (only for Cycle mode)
self.continuous_mode_check = QCheckBox("Continuous Mode")
self.continuous_mode_check.setChecked(True)
self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")
button_layout.addWidget(self.continuous_mode_check)
self.continuous_mode_check.hide()
# Record button for Live mode
self.record_button = QPushButton("Start Recording")
self.record_button.setCheckable(True)
self.record_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.success_color};
color: {self.fg_color};
font-weight: bold;
padding: 6px;
border-radius: 4px;
}}
QPushButton:checked {{
background-color: {self.warning_color};
}}
""")
self.record_button.clicked.connect(self.toggle_recording)
button_layout.addWidget(self.record_button)
self.record_button.hide()
controls_layout.addWidget(button_frame)
self.main_layout.addWidget(controls_frame)
# Plot area
self.setup_plot()
# Status bar
self.status_bar = self.statusBar()
self.status_bar.setStyleSheet(f"color: {self.fg_color};")
self.status_bar.showMessage("Ready")
# Apply dark theme
self.setStyleSheet(f"""
QMainWindow {{
background-color: {self.bg_color};
}}
QLabel {{
color: {self.fg_color};
}}
QLineEdit {{
background-color: #3B4252;
color: {self.fg_color};
border: 1px solid #4C566A;
border-radius: 3px;
padding: 2px;
}}
""")
def toggle_test(self):
"""Toggle between start and stop based on button state"""
if self.toggle_button.isChecked():
# Button shows "STOP" - run start logic
self.toggle_button.setText("STOP")
self.start_test()
else:
# Button shows "START" - run stop logic
self.toggle_button.setText("START")
self.stop_test()
def change_mode(self, mode_name):
"""Change between different test modes"""
self.current_mode = mode_name
self.stop_test() # Stop any current operation
# Show/hide mode-specific UI elements
show_charge = mode_name in ["Cycle Test", "Charge Test"]
show_discharge = mode_name in ["Cycle Test", "Discharge Test"]
show_rest = mode_name == "Cycle Test"
self.charge_cutoff_label.setVisible(show_charge)
self.charge_cutoff_input.setVisible(show_charge)
self.discharge_cutoff_label.setVisible(show_discharge)
self.discharge_cutoff_input.setVisible(show_discharge)
self.rest_time_label.setVisible(show_rest)
self.rest_time_input.setVisible(show_rest)
# Continuous mode checkbox only for cycle test
self.continuous_mode_check.setVisible(mode_name == "Cycle Test")
# Record button only for live monitoring
self.record_button.setVisible(mode_name == "Live Monitoring")
# Set button text based on mode
if mode_name == "Cycle Test":
self.toggle_button.setText("START CYCLE TEST")
elif mode_name == "Discharge Test":
self.toggle_button.setText("START DISCHARGE")
elif mode_name == "Charge Test":
self.toggle_button.setText("START CHARGE")
elif mode_name == "Live Monitoring":
self.toggle_button.setText("START") # Will be hidden anyway
self.toggle_button.hide()
# Reset button state
self.toggle_button.setChecked(False)
# Reset measurement state and zero the time
if self.active_device:
dev = self.active_device
dev.reset_data() # This clears all data buffers
# Reset the measurement thread's start time
if hasattr(dev, 'measurement_thread'):
dev.measurement_thread.start_time = time.time()
# Reset UI displays
self.capacity_label.setText("0.0000")
self.energy_label.setText("0.0000")
self.cycle_label.setText("0")
self.phase_label.setText("Idle")
self.time_label.setText("00:00:00")
# Reset plot
self.reset_plot()
self.status_bar.showMessage(f"Mode changed to {mode_name}")
def reset_test(self):
if not self.active_device:
return
dev_manager = self.active_device
dev_manager.reset_data() # Reset in DeviceManager
# UI zurücksetzen
self.capacity_label.setText("0.0000")
self.energy_label.setText("0.0000")
self.cycle_label.setText("0")
self.phase_label.setText("Idle")
def toggle_recording(self):
"""Toggle data recording in Live Monitoring mode"""
if self.record_button.isChecked():
# Start recording
try:
# Reset previous data
self.reset_test()
# Reset measurement timing
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
if self.create_cycle_log_file():
self.record_button.setText("Stop Recording")
self.status_bar.showMessage("Live recording started")
# Ensure monitoring is running
if not self.test_running:
self.start_live_monitoring()
else:
self.record_button.setChecked(False)
self.current_cycle_file = None
except Exception as e:
print(f"Error starting recording: {e}")
self.record_button.setChecked(False)
self.current_cycle_file = None
QMessageBox.critical(self, "Error", f"Failed to start recording:\n{str(e)}")
else:
# Stop recording
try:
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
self.finalize_log_file()
self.record_button.setText("Start Recording")
self.status_bar.showMessage("Live recording stopped")
except Exception as e:
print(f"Error stopping recording: {e}")
def handle_continuous_mode_change(self, state):
"""Handle changes to continuous mode checkbox during operation"""
if not state and self.test_running: # If unchecked during test
self.status_bar.showMessage("Continuous mode disabled - will complete current cycle")
self.continuous_mode_check.setStyleSheet(f"color: {self.warning_color};")
QTimer.singleShot(2000, lambda: self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};"))
def setup_plot(self):
"""Configure the matplotlib plot"""
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color)
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
self.ax = self.fig.add_subplot(111)
self.ax.set_facecolor('#3B4252')
# Set initial voltage range
voltage_padding = 0.2
min_voltage = max(0, 0.9 - voltage_padding)
max_voltage = 1.43 + voltage_padding
self.ax.set_ylim(min_voltage, max_voltage)
# Voltage plot
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
# Current plot (right axis)
self.ax2 = self.ax.twinx()
current_padding = 0.05
test_current = 0.1 * 0.2 # Default values
max_current = test_current * 1.5
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
self.ax2.set_ylabel("Current (A)", color='r')
self.ax2.tick_params(axis='y', labelcolor='r')
self.ax.set_xlabel('Time (s)', color=self.fg_color)
self.ax.set_title('Battery Test', color=self.fg_color)
self.ax.tick_params(axis='x', colors=self.fg_color)
self.ax.grid(True, color='#4C566A')
# Position legends
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
# Embed plot
self.canvas = FigureCanvas(self.fig)
self.canvas.setStyleSheet(f"background-color: {self.bg_color};")
self.main_layout.addWidget(self.canvas, 1)
def init_device(self):
"""Initialize ADALM1000 devices with proper error handling"""
try:
# Cleanup previous session
if hasattr(self, 'session'):
try:
self.session.end()
except:
pass
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
# Retry mechanism with progress feedback
retry_count = 0
while not self.session.devices and retry_count < 3:
self.status_bar.showMessage(f"Scanning for devices... (Attempt {retry_count + 1}/3)")
QApplication.processEvents() # Update UI
time.sleep(1)
self.session.scan() # Manual scan
retry_count += 1
if not self.session.devices:
self.handle_no_devices()
return
self.session.start(0)
self.devices = {}
for dev in self.session.devices:
manager = DeviceManager(dev)
manager.start_measurement(interval=self.interval)
self.devices[dev.serial] = manager
# Select first device
first_serial = next(iter(self.devices.keys()))
self.active_device = self.devices[first_serial]
# Update UI
self.device_combo.clear()
for serial in self.devices:
self.device_combo.addItem(serial)
self.device_combo.setCurrentText(first_serial)
self.session_active = True
self.connection_label.setText(f"Connected: {first_serial}")
self.status_light.setStyleSheet("background-color: green; border-radius: 10px;")
self.toggle_button.setEnabled(True)
# Connect measurement signals
self.measurement_thread = self.active_device.measurement_thread
self.measurement_thread.update_signal.connect(self.update_measurements)
self.measurement_thread.error_signal.connect(self.handle_device_error)
except Exception as e:
self.handle_device_error(f"Initialization failed: {str(e)}")
def handle_no_devices(self):
"""Handle case when no devices are found"""
self.session_active = False
self.active_device = None
self.status_bar.showMessage("No ADALM1000 devices found")
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
self.start_button.setEnabled(False)
self.device_combo.clear()
# Show reconnect button
self.reconnect_btn.setEnabled(True)
self.reconnect_btn.setVisible(True)
def request_usb_permissions(self):
"""Handle USB permission issues with user interaction"""
msg = QMessageBox(self)
msg.setIcon(QMessageBox.Critical)
msg.setWindowTitle("USB Permission Required")
msg.setText("Permission needed to access ADALM1000 devices")
msg.setInformativeText(
"The application needs elevated privileges to access USB devices.\n\n"
"Please choose an option:"
)
# Add buttons
sudo_button = msg.addButton("Run as Administrator", QMessageBox.ActionRole)
udev_button = msg.addButton("Fix Permissions", QMessageBox.ActionRole)
cancel_button = msg.addButton(QMessageBox.Cancel)
msg.exec_()
if msg.clickedButton() == sudo_button:
# Restart with sudo
QMessageBox.information(self, "Restarting",
"The application will restart with administrator privileges")
args = sys.argv[:]
args.insert(0, sys.executable)
os.execvp("sudo", ["sudo"] + args)
elif msg.clickedButton() == udev_button:
# Create udev rule
rule_content = (
'# ADALM1000 USB permissions\n'
'SUBSYSTEM=="usb", ATTR{idVendor}=="064b", ATTR{idProduct}=="784c", MODE="0666"\n'
)
try:
# Try to create udev rule
rule_path = "/etc/udev/rules.d/52-adalm1000.rules"
with open(rule_path, "w") as f:
f.write(rule_content)
# Apply rules
os.system("sudo udevadm control --reload-rules")
os.system("sudo udevadm trigger")
QMessageBox.information(self, "Permissions Fixed",
"USB permissions configured. Please reconnect devices.")
except Exception as e:
QMessageBox.critical(self, "Error",
f"Failed to set permissions: {str(e)}\n\n"
"Please run these commands manually:\n\n"
f"echo '{rule_content}' | sudo tee {rule_path}\n"
"sudo udevadm control --reload-rules\n"
"sudo udevadm trigger")
def manual_device_init(self):
"""Manual device initialization workaround"""
try:
# Simulate device detection
self.device_combo.clear()
self.device_combo.addItem("ADALM1000-1 (Simulated)")
self.device_combo.addItem("ADALM1000-2 (Simulated)")
# Mock connection
self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;")
self.connection_label.setText("Simulated Devices")
self.session_active = True
self.toggle_button.setEnabled(True)
QMessageBox.warning(self, "Simulation Mode",
"Using simulated devices - real hardware not detected")
except Exception as e:
print(f"Manual init failed: {e}")
def change_device(self, index):
if not self.session_active or index < 0:
return
serial = self.device_combo.itemText(index)
if serial not in self.devices:
return
# Stop current device
if self.active_device:
try:
# Disconnect signals safely
if hasattr(self.active_device, 'measurement_thread'):
thread = self.active_device.measurement_thread
try:
if thread.isRunning():
thread.stop()
thread.wait(500)
thread.update_signal.disconnect()
thread.error_signal.disconnect()
except (RuntimeError, TypeError) as e:
print(f"Signal disconnect warning: {e}")
except Exception as e:
print(f"Error stopping thread: {e}")
except Exception as e:
print(f"Error stopping previous device: {e}")
# Activate new device
self.active_device = self.devices[serial]
# Connect signals to new device
if hasattr(self.active_device, 'measurement_thread'):
try:
self.measurement_thread = self.active_device.measurement_thread
self.measurement_thread.update_signal.connect(self.update_measurements)
self.measurement_thread.error_signal.connect(self.handle_device_error)
# Start measurement only AFTER connecting signals
if not self.measurement_thread.isRunning():
self.active_device.start_measurement(self.interval)
except Exception as e:
print(f"Error connecting to new device: {e}")
return
# Update UI with current device data
self.update_ui_from_active_device()
self.status_bar.showMessage(f"Switched to device: {serial}")
def update_ui_from_active_device(self):
dev = self.active_device
if not dev:
return
with self.plot_mutex:
# Kopiere aktuelle Daten
x = list(dev.display_time_data)
y_v = list(dev.display_voltage_data)
y_c = list(dev.display_current_data)
# Aktualisiere Plot
self.line_voltage.set_data(x, y_v)
self.line_current.set_data(x, y_c)
self.auto_scale_axes()
self.canvas.draw_idle()
# Aktualisiere Labels
if dev.voltage_data:
v = dev.voltage_data[-1]
i = dev.current_data[-1]
t = dev.time_data[-1] if dev.time_data else 0
self.voltage_label.setText(f"{v:.4f}")
self.current_label.setText(f"{abs(i):.4f}")
self.time_label.setText(self.format_time(t))
self.capacity_label.setText(f"{dev.capacity_ah:.4f}")
self.energy_label.setText(f"{dev.energy:.4f}")
self.cycle_label.setText(str(dev.cycle_count))
self.phase_label.setText(dev.test_phase)
@pyqtSlot(float, float, float)
def update_measurements(self, voltage, current, current_time):
if not self.active_device:
return
dev = self.active_device
# Add measurement validation
if not self.validate_measurements(voltage, current):
print(f"Invalid measurement: V={voltage:.4f}, I={current:.4f}")
return
with self.plot_mutex:
dev.time_data.append(current_time)
dev.voltage_data.append(voltage)
dev.current_data.append(current)
# Aggregation for display
agg_buf = dev.aggregation_buffer
agg_buf['time'].append(current_time)
agg_buf['voltage'].append(voltage)
agg_buf['current'].append(current)
agg_buf['count'] += 1
now = time.time()
if now - agg_buf['last_plot_time'] >= 0.1:
agg_time = np.mean(agg_buf['time'])
agg_voltage = np.mean(agg_buf['voltage'])
agg_current = np.mean(agg_buf['current'])
dev.display_time_data.append(agg_time)
dev.display_voltage_data.append(agg_voltage)
dev.display_current_data.append(agg_current)
dev.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': now
}
# Update UI labels
self.voltage_label.setText(f"{voltage:.4f}")
self.current_label.setText(f"{abs(current):.4f}")
self.time_label.setText(self.format_time(current_time))
# Calculate capacity if we have enough data points
if len(dev.time_data) > 1:
delta_t = dev.time_data[-1] - dev.time_data[-2]
power = voltage * abs(current)
dev.capacity_ah += abs(current) * delta_t / 3600
dev.energy += power * delta_t / 3600
self.capacity_label.setText(f"{dev.capacity_ah:.4f}")
self.energy_label.setText(f"{dev.energy:.4f}")
# Update plot periodically
now = time.time()
if not hasattr(self, '_last_plot_update'):
self._last_plot_update = 0
if now - self._last_plot_update >= 0.1:
self._last_plot_update = now
QTimer.singleShot(0, self.update_plot)
def adjust_downsampling(self):
current_length = len(self.time_data)
if current_length > self.max_points_to_keep * 1.5:
# Exponentiell erhöhen, aber max. 64
new_factor = min(64, max(1, self.downsample_factor * 2))
elif current_length < self.max_points_to_keep // 2:
# Halbieren, aber min. 1
new_factor = max(1, self.downsample_factor // 2)
else:
return
if new_factor != self.downsample_factor:
self.downsample_factor = new_factor
self.status_bar.showMessage(
f"Downsampling: Factor {self.downsample_factor}", 2000)
def update_status_and_plot(self):
"""Combined status and plot update"""
self.update_status()
self.update_plot()
def update_status(self):
"""Update status information periodically"""
now = time.time()
if not self.active_device:
return
dev = self.active_device
if self.test_running or (hasattr(self, 'record_button') and self.record_button.isChecked()):
if dev.time_data:
current_time = dev.time_data[-1]
if len(dev.time_data) > 1:
delta_t = dev.time_data[-1] - dev.time_data[-2]
if delta_t > 0:
current_current = abs(dev.current_data[-1])
dev.capacity_ah += current_current * delta_t / 3600
self.capacity_label.setText(f"{dev.capacity_ah:.4f}")
# Logging (1x per second)
if (hasattr(self, 'log_writer') and
hasattr(self, 'current_cycle_file') and
self.current_cycle_file is not None and
not self.current_cycle_file.closed):
if not hasattr(self, '_last_log_time'):
self._last_log_time = now
if dev.time_data and (now - self._last_log_time >= 1.0):
try:
current_time = dev.time_data[-1]
voltage = dev.voltage_data[-1]
current = dev.current_data[-1]
if self.current_mode == "Cycle Test":
self.log_writer.writerow([
f"{current_time:.4f}",
f"{voltage:.6f}",
f"{current:.6f}",
dev.test_phase,
f"{dev.capacity_ah:.4f}",
f"{dev.charge_capacity:.4f}",
f"{dev.coulomb_efficiency:.1f}",
f"{dev.cycle_count}"
])
else:
self.log_writer.writerow([
f"{current_time:.4f}",
f"{voltage:.6f}",
f"{current:.6f}",
dev.test_phase if hasattr(dev, 'test_phase') else "Live",
f"{dev.capacity_ah:.4f}",
f"{voltage * current:.4f}", # Power
f"{dev.energy:.4f}", # Energy
f"{dev.cycle_count}" if hasattr(dev, 'cycle_count') else "1"
])
self.current_cycle_file.flush()
self._last_log_time = now
except Exception as e:
print(f"Error writing to log file: {e}")
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
try:
self.current_cycle_file.close()
except:
pass
self.record_button.setChecked(False)
self.current_cycle_file = None
def start_test(self):
"""Start the selected test mode using the active device"""
if not self.active_device:
QMessageBox.warning(self, "No Device", "No ADALM1000 device selected.")
self.toggle_button.setChecked(False)
return
dev_manager = self.active_device
dev = dev_manager.dev
# Clean up any previous test
self.cleanup_test_threads()
# Reset test state for active device
self.reset_test()
# Reset measurement thread timing
dev_manager.measurement_thread.start_time = time.time()
dev_manager.measurement_thread.voltage_window.clear()
dev_manager.measurement_thread.current_window.clear()
with dev_manager.measurement_thread.measurement_queue.mutex:
dev_manager.measurement_thread.measurement_queue.queue.clear()
# Reset data buffers
dev_manager.time_data.clear()
dev_manager.voltage_data.clear()
dev_manager.current_data.clear()
dev_manager.display_time_data.clear()
dev_manager.display_voltage_data.clear()
dev_manager.display_current_data.clear()
dev_manager.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0
}
# Reset device state
try:
dev.channels['A'].mode = pysmu.Mode.HI_Z
dev.channels['A'].constant(0)
dev.channels['B'].mode = pysmu.Mode.HI_Z
dev.channels['B'].constant(0)
except Exception as e:
QMessageBox.critical(self, "Device Error", f"Failed to reset device: {e}")
self.toggle_button.setChecked(False)
return
# Reset test variables
dev_manager.capacity_ah = 0.0
dev_manager.energy = 0.0
dev_manager.charge_capacity = 0.0
dev_manager.coulomb_efficiency = 0.0
dev_manager.cycle_count = 0
dev_manager.start_time = time.time()
dev_manager.test_phase = "Running"
# Set global state
self.test_running = True
self.request_stop = False
# Update UI
self.phase_label.setText(dev_manager.test_phase)
self.toggle_button.setText("STOP")
# Get parameters from UI
try:
self.capacity = float(self.capacity_input.text())
self.c_rate = float(self.c_rate_input.text())
test_current = self.c_rate * self.capacity
if test_current > 0.2:
raise ValueError("Current must be ≤ 200mA (0.2A) for ADALM1000")
except ValueError as e:
QMessageBox.critical(self, "Input Error", str(e))
self.stop_test()
return
# Create log file
if not self.create_cycle_log_file():
self.stop_test()
return
# Start the appropriate test
if self.current_mode == "Cycle Test":
try:
self.charge_cutoff = float(self.charge_cutoff_input.text())
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
self.rest_time = float(self.rest_time_input.text())
if self.charge_cutoff <= self.discharge_cutoff:
raise ValueError("Charge cutoff must be higher than discharge cutoff")
# Start test sequence
self.test_sequence_thread = QThread()
self.test_sequence_worker = TestSequenceWorker(
dev,
test_current,
self.charge_cutoff,
self.discharge_cutoff,
self.rest_time,
self.continuous_mode_check.isChecked(),
self
)
self.test_sequence_worker.moveToThread(self.test_sequence_thread)
self.test_sequence_worker.update_phase.connect(self.update_test_phase)
self.test_sequence_worker.update_status.connect(self.status_bar.showMessage)
self.test_sequence_worker.test_completed.connect(self.finalize_test)
self.test_sequence_worker.error_occurred.connect(self.handle_test_error)
self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit)
self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater)
self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater)
self.test_sequence_thread.start()
QTimer.singleShot(0, self.test_sequence_worker.run)
self.status_bar.showMessage(f"Cycle test started | Device: {dev.serial} | Current: {test_current:.4f}A")
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
self.stop_test()
elif self.current_mode == "Discharge Test":
try:
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
self.discharge_thread = QThread()
self.discharge_worker = DischargeWorker(dev, test_current, self.discharge_cutoff, self)
self.discharge_worker.moveToThread(self.discharge_thread)
self.discharge_worker.update_status.connect(self.status_bar.showMessage)
self.discharge_worker.test_completed.connect(self.finalize_test)
self.discharge_worker.error_occurred.connect(self.handle_test_error)
self.discharge_worker.finished.connect(self.discharge_thread.quit)
self.discharge_worker.finished.connect(self.discharge_worker.deleteLater)
self.discharge_thread.finished.connect(self.discharge_thread.deleteLater)
self.discharge_thread.start()
QTimer.singleShot(0, self.discharge_worker.run)
self.status_bar.showMessage(f"Discharge test started | Device: {dev.serial} | Current: {test_current:.4f}A")
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
self.stop_test()
elif self.current_mode == "Charge Test":
try:
self.charge_cutoff = float(self.charge_cutoff_input.text())
self.charge_thread = QThread()
self.charge_worker = ChargeWorker(dev, test_current, self.charge_cutoff, self)
self.charge_worker.moveToThread(self.charge_thread)
self.charge_worker.update_status.connect(self.status_bar.showMessage)
self.charge_worker.test_completed.connect(self.finalize_test)
self.charge_worker.error_occurred.connect(self.handle_test_error)
self.charge_worker.finished.connect(self.charge_thread.quit)
self.charge_worker.finished.connect(self.charge_worker.deleteLater)
self.charge_thread.finished.connect(self.charge_thread.deleteLater)
self.charge_thread.start()
QTimer.singleShot(0, self.charge_worker.run)
self.status_bar.showMessage(f"Charge test started | Device: {dev.serial} | Current: {test_current:.4f}A")
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
self.stop_test()
elif self.current_mode == "Live Monitoring":
self.start_live_monitoring()
def start_cycle_test(self):
"""Start the battery cycle test"""
# Clean up any previous test
if hasattr(self, 'test_sequence_worker'):
try:
self.test_sequence_worker.stop()
except:
pass
self.test_sequence_worker.deleteLater()
if hasattr(self, 'test_sequence_thread'):
self.test_sequence_thread.quit()
self.test_sequence_thread.wait()
self.test_sequence_thread.deleteLater()
del self.test_sequence_thread
self.reset_test()
self.reset_plot()
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
# Reset stop flag
self.request_stop = False
if not self.test_running:
try:
# Get parameters from UI
self.capacity = float(self.capacity_input.text())
self.charge_cutoff = float(self.charge_cutoff_input.text())
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
self.rest_time = float(self.rest_time_input.text())
self.c_rate = float(self.c_rate_input.text())
# Validate inputs
if self.capacity <= 0:
raise ValueError("Battery capacity must be positive")
if self.charge_cutoff <= self.discharge_cutoff:
raise ValueError("Charge cutoff must be higher than discharge cutoff")
if self.c_rate <= 0:
raise ValueError("C-rate must be positive")
test_current = self.c_rate * self.capacity
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear ALL previous data completely
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
self.phase_data.clear()
# Reset capacities and timing
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity_ah = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
self.energy = 0.0
# Reset measurement thread's timer and queues
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
self.measurement_thread.voltage_window.clear()
self.measurement_thread.current_window.clear()
with self.measurement_thread.measurement_queue.mutex:
self.measurement_thread.measurement_queue.queue.clear()
# Reset plot completely
self.reset_plot()
# Start test
self.test_running = True
self.start_time = time.time()
self.last_update_time = time.time()
self.test_phase = "Initial Discharge"
self.phase_label.setText(self.test_phase)
self.toggle_button.setChecked(True)
self.toggle_button.setText("STOP")
self.status_bar.showMessage(f"Cycle test started | Current: {test_current:.4f}A")
# Create log file
self.create_cycle_log_file()
# Start test sequence in a QThread
self.test_sequence_thread = QThread()
self.test_sequence_worker = TestSequenceWorker(
self.active_device.dev,
test_current,
self.charge_cutoff,
self.discharge_cutoff,
self.rest_time,
self.continuous_mode_check.isChecked(),
self # Pass reference to main window for callbacks
)
self.test_sequence_worker.moveToThread(self.test_sequence_thread)
# Connect signals
self.test_sequence_worker.update_phase.connect(self.update_test_phase)
self.test_sequence_worker.update_status.connect(self.status_bar.showMessage)
self.test_sequence_worker.test_completed.connect(self.finalize_test)
self.test_sequence_worker.error_occurred.connect(self.handle_test_error)
self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit)
self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater)
self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater)
# Start the thread and the worker's run method
self.test_sequence_thread.start()
QTimer.singleShot(0, self.test_sequence_worker.run)
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
# Ensure buttons are in correct state if error occurs
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.toggle_button.setEnabled(True)
def start_discharge_test(self):
"""Start the battery discharge test"""
# Clean up any previous test
self.reset_test() # löscht time_data, voltage_data, current_data, display_*, phase_data
self.reset_plot()
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
if hasattr(self, 'discharge_worker'):
try:
self.discharge_worker.stop()
except:
pass
self.discharge_worker.deleteLater()
if hasattr(self, 'discharge_thread'):
self.discharge_thread.quit()
self.discharge_thread.wait() # warte unbegrenzt, bis er wirklich fertig ist
self.discharge_thread.deleteLater()
del self.discharge_thread
# Reset stop flag
self.request_stop = False
if not self.test_running:
try:
# Get parameters from UI
self.capacity = float(self.capacity_input.text())
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
self.c_rate = float(self.c_rate_input.text())
# Validate inputs
if self.capacity <= 0:
raise ValueError("Battery capacity must be positive")
if self.c_rate <= 0:
raise ValueError("C-rate must be positive")
test_current = self.c_rate * self.capacity
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear ALL previous data completely
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
# Reset capacities and timing
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity_ah = 0.0
self.energy = 0.0
self.cycle_count = 1
# Reset measurement thread's timer and queues
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
self.measurement_thread.voltage_window.clear()
self.measurement_thread.current_window.clear()
with self.measurement_thread.measurement_queue.mutex:
self.measurement_thread.measurement_queue.queue.clear()
# Reset plot completely
self.reset_plot()
# Start test
self.test_running = True
self.start_time = time.time()
self.last_update_time = time.time()
self.test_phase = "Discharge"
self.phase_label.setText(self.test_phase)
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.status_bar.showMessage(f"Discharge started | Current: {test_current:.4f}A")
# Create log file
self.create_cycle_log_file()
# Start discharge worker in a QThread
self.discharge_thread = QThread()
self.discharge_worker = DischargeWorker(
self.active_device.dev,
test_current,
self.discharge_cutoff,
self # Pass reference to main window for callbacks
)
self.discharge_worker.moveToThread(self.discharge_thread)
# Connect signals
self.discharge_worker.update_status.connect(self.status_bar.showMessage)
self.discharge_worker.test_completed.connect(self.finalize_test)
self.discharge_worker.error_occurred.connect(self.handle_test_error)
self.discharge_worker.finished.connect(self.discharge_thread.quit)
self.discharge_worker.finished.connect(self.discharge_worker.deleteLater)
self.discharge_thread.finished.connect(self.discharge_thread.deleteLater)
# Start the thread and the worker's run method
self.discharge_thread.start()
QTimer.singleShot(0, self.discharge_worker.run)
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
# Ensure buttons are in correct state if error occurs
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.toggle_button.setEnabled(True)
def start_charge_test(self):
"""Start the battery charge test"""
# Clean up any previous test
if hasattr(self, 'charge_worker'):
try:
self.charge_worker.stop()
except:
pass
self.charge_worker.deleteLater()
if hasattr(self, 'charge_thread'):
self.charge_thread.quit()
self.charge_thread.wait()
self.charge_thread.deleteLater()
del self.charge_thread
self.reset_test()
self.reset_plot()
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
# Reset stop flag
self.request_stop = False
if not self.test_running:
try:
# Get parameters from UI
self.capacity = float(self.capacity_input.text())
self.charge_cutoff = float(self.charge_cutoff_input.text())
self.c_rate = float(self.c_rate_input.text())
# Validate inputs
if self.capacity <= 0:
raise ValueError("Battery capacity must be positive")
if self.c_rate <= 0:
raise ValueError("C-rate must be positive")
test_current = self.c_rate * self.capacity
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear ALL previous data completely
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
# Reset capacities and timing
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity_ah = 0.0
self.energy = 0.0
self.cycle_count = 1
# Reset measurement thread
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
self.measurement_thread.voltage_window.clear()
self.measurement_thread.current_window.clear()
with self.measurement_thread.measurement_queue.mutex:
self.measurement_thread.measurement_queue.queue.clear()
# Reset plot
self.reset_plot()
# Start test
self.test_running = True
self.start_time = time.time()
self.last_update_time = time.time()
self.test_phase = "Charge"
self.phase_label.setText(self.test_phase)
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.status_bar.showMessage(f"Charge started @ {test_current:.3f}A to {self.charge_cutoff}V")
# Create log file
self.create_cycle_log_file()
# Start charge worker in a QThread
self.charge_thread = QThread()
self.charge_worker = ChargeWorker(
self.active_device.dev,
test_current,
self.charge_cutoff,
self
)
self.charge_worker.moveToThread(self.charge_thread)
# Connect signals
self.charge_worker.update_status.connect(self.status_bar.showMessage)
self.charge_worker.test_completed.connect(self.finalize_test)
self.charge_worker.error_occurred.connect(self.handle_test_error)
self.charge_worker.finished.connect(self.charge_thread.quit)
self.charge_worker.finished.connect(self.charge_worker.deleteLater)
self.charge_thread.finished.connect(self.charge_thread.deleteLater)
# Start the thread
self.charge_thread.start()
QTimer.singleShot(0, self.charge_worker.run)
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.toggle_button.setEnabled(True)
def start_live_monitoring(self):
"""Start live monitoring mode for the active device"""
if not self.active_device:
QMessageBox.warning(self, "No Device", "No ADALM1000 selected.")
return
dev_manager = self.active_device
dev = dev_manager.dev
try:
# Reset test state for active device
self.reset_test()
# Reset measurement thread timing
if hasattr(dev_manager, 'measurement_thread'):
dev_manager.measurement_thread.start_time = time.time()
dev_manager.measurement_thread.voltage_window.clear()
dev_manager.measurement_thread.current_window.clear()
with dev_manager.measurement_thread.measurement_queue.mutex:
dev_manager.measurement_thread.measurement_queue.queue.clear()
# Reset device state
dev.channels['A'].mode = pysmu.Mode.HI_Z
dev.channels['A'].constant(0)
dev.channels['B'].mode = pysmu.Mode.HI_Z
dev.channels['B'].constant(0)
# Reset UI
self.test_running = True
dev_manager.test_phase = "Live Monitoring"
self.phase_label.setText(dev_manager.test_phase)
self.stop_button.setEnabled(True)
self.start_button.setEnabled(False)
# Hide the toggle button in Live mode
self.toggle_button.hide()
# Status
self.status_bar.showMessage(f"Live monitoring started | Device: {dev.serial}")
except Exception as e:
self.handle_device_error(f"Failed to start live monitoring: {str(e)}")
def create_cycle_log_file(self):
"""Create a new log file for the current test with device serial in filename"""
try:
self._last_log_time = time.time()
# Schließe vorherige Datei
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
try:
self.current_cycle_file.close()
except Exception as e:
print(f"Error closing previous log file: {e}")
self.current_cycle_file = None
# Stelle sicher, dass Log-Ordner existiert
os.makedirs(self.log_dir, exist_ok=True)
if not os.access(self.log_dir, os.W_OK):
QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}")
return False
# Generiere Timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
device_serial = self.active_device.serial if self.active_device else "unknown"
# Generiere Dateinamen mit Seriennummer
if self.current_mode == "Cycle Test":
self.filename = os.path.join(self.log_dir, f"battery_cycle_{device_serial}_{timestamp}.csv")
elif self.current_mode == "Discharge Test":
self.filename = os.path.join(self.log_dir, f"battery_discharge_{device_serial}_{timestamp}.csv")
else: # Live Monitoring
self.filename = os.path.join(self.log_dir, f"battery_live_{device_serial}_{timestamp}.csv")
# Öffne neue Datei
try:
self.current_cycle_file = open(self.filename, 'w', newline='')
test_current = self.c_rate * self.capacity
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
# Schreibe Header
self.current_cycle_file.write(f"# ADALM1000 Battery Test Log - {self.current_mode}\n")
self.current_cycle_file.write(f"# Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
self.current_cycle_file.write(f"# Device Serial: {device_serial}\n")
self.current_cycle_file.write(f"# Battery Capacity: {self.capacity} Ah\n")
if self.current_mode != "Live Monitoring":
self.current_cycle_file.write(f"# Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n")
if self.current_mode == "Cycle Test":
self.current_cycle_file.write(f"# Charge Cutoff: {self.charge_cutoff} V\n")
self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n")
self.current_cycle_file.write(f"# Rest Time: {self.rest_time} hours\n")
elif self.current_mode == "Discharge Test":
self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n")
self.current_cycle_file.write(f"# Test Conditions/Chemistry: {test_conditions}\n")
self.current_cycle_file.write("#\n")
# CSV Writer
self.log_writer = csv.writer(self.current_cycle_file)
if self.current_mode == "Cycle Test":
self.log_writer.writerow([
"Time(s)", "Voltage(V)", "Current(A)", "Phase",
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
"Coulomb_Eff(%)", "Cycle"
])
else:
self.log_writer.writerow([
"Time(s)", "Voltage(V)", "Current(A)", "Phase",
"Capacity(Ah)", "Power(W)", "Energy(Wh)", "Cycle"
])
return True
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to create log file: {e}")
return False
except Exception as e:
print(f"Error in create_cycle_log_file: {e}")
return False
def finalize_log_file(self):
"""Finalize the current log file"""
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
try:
test_current = self.c_rate * self.capacity
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
self.current_cycle_file.write("\n# TEST SUMMARY\n")
self.current_cycle_file.write(f"# Test Parameters:\n")
self.current_cycle_file.write(f"# - Battery Capacity: {self.capacity} Ah\n")
if self.current_mode != "Live Monitoring":
self.current_cycle_file.write(f"# - Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n")
if self.current_mode == "Cycle Test":
self.current_cycle_file.write(f"# - Charge Cutoff: {self.charge_cutoff} V\n")
self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n")
self.current_cycle_file.write(f"# - Rest Time: {self.rest_time} hours\n")
elif self.current_mode == "Discharge Test":
self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n")
self.current_cycle_file.write(f"# - Test Conditions: {test_conditions}\n")
self.current_cycle_file.write(f"# Results:\n")
if self.current_mode == "Cycle Test":
self.current_cycle_file.write(f"# - Cycles Completed: {self.cycle_count}\n")
self.current_cycle_file.write(f"# - Final Discharge Capacity: {self.capacity_ah:.4f} Ah\n")
self.current_cycle_file.write(f"# - Final Charge Capacity: {self.charge_capacity:.4f} Ah\n")
self.current_cycle_file.write(f"# - Coulombic Efficiency: {self.coulomb_efficiency:.1f}%\n")
else:
self.current_cycle_file.write(f"# - Capacity: {self.capacity_ah:.4f} Ah\n")
self.current_cycle_file.write(f"# - Energy: {self.energy:.4f} Wh\n")
self.current_cycle_file.close()
except Exception as e:
print(f"Error closing log file: {e}")
finally:
self.current_cycle_file = None
def format_time(self, seconds):
"""Convert seconds to hh:mm:ss format"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def stop_test(self):
"""Request immediate stop of the current test or monitoring"""
if not self.test_running:
return
self.request_stop = True
self.test_running = False
self.measuring = False
# Stop test threads
for attr in ['test_sequence_worker', 'discharge_worker', 'charge_worker']:
if hasattr(self, attr):
worker = getattr(self, attr)
try:
if worker and not sip.isdeleted(worker):
worker.stop()
except:
pass
# Reset UI
if self.active_device:
self.active_device.test_phase = "Idle"
self.phase_label.setText("Idle")
# Reset button state
self.toggle_button.setChecked(False)
if self.current_mode == "Cycle Test":
self.toggle_button.setText("START CYCLE TEST")
elif self.current_mode == "Discharge Test":
self.toggle_button.setText("START DISCHARGE")
elif self.current_mode == "Charge Test":
self.toggle_button.setText("START CHARGE")
if self.current_mode == "Live Monitoring":
self.status_bar.showMessage("Live monitoring stopped")
else:
self.status_bar.showMessage("Test stopped by user")
def finalize_test(self):
"""Final cleanup after test completes or is stopped"""
try:
# 1. Stop any active measurement or test operations
self.measuring = False
self.test_running = False
# 2. Reset device to safe state
try:
self.active_device.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.active_device.dev.channels['A'].constant(0)
self.active_device.dev.channels['B'].mode = pysmu.Mode.HI_Z
self.active_device.dev.channels['B'].constant(0)
except Exception as e:
print(f"Error resetting device in finalize: {e}")
# 3. Clean up test sequence thread safely
if hasattr(self, 'test_sequence_thread'):
try:
if self.test_sequence_thread.isRunning():
if hasattr(self, 'test_sequence_worker'):
try:
self.test_sequence_worker.stop()
except RuntimeError:
pass
self.test_sequence_thread.quit()
self.test_sequence_thread.wait(500)
except Exception as e:
print(f"Error stopping test sequence thread: {e}")
finally:
if hasattr(self, 'test_sequence_worker'):
try:
if not sip.isdeleted(self.test_sequence_worker):
self.test_sequence_worker.deleteLater()
except:
pass
if hasattr(self, 'test_sequence_thread'):
try:
if not sip.isdeleted(self.test_sequence_thread):
self.test_sequence_thread.deleteLater()
except:
pass
finally:
if hasattr(self, 'test_sequence_thread'):
del self.test_sequence_thread
# 4. Clean up discharge thread safely
if hasattr(self, 'discharge_thread'):
try:
if self.discharge_thread.isRunning():
if hasattr(self, 'discharge_worker'):
try:
self.discharge_worker.stop()
except RuntimeError:
pass
self.discharge_thread.quit()
self.discharge_thread.wait(500)
except Exception as e:
print(f"Error stopping discharge thread: {e}")
finally:
if hasattr(self, 'discharge_worker'):
try:
if not sip.isdeleted(self.discharge_worker):
self.discharge_worker.deleteLater()
except:
pass
if hasattr(self, 'discharge_thread'):
try:
if not sip.isdeleted(self.discharge_thread):
self.discharge_thread.deleteLater()
except:
pass
finally:
if hasattr(self, 'discharge_thread'):
del self.discharge_thread
# 5. Clean up charge thread safely (using same pattern as discharge thread)
if hasattr(self, 'charge_thread'):
try:
if self.charge_thread.isRunning():
if hasattr(self, 'charge_worker'):
try:
self.charge_worker.stop()
except RuntimeError:
pass
self.charge_thread.quit()
self.charge_thread.wait(500)
except Exception as e:
print(f"Error stopping charge thread: {e}")
finally:
if hasattr(self, 'charge_worker'):
try:
if not sip.isdeleted(self.charge_worker):
self.charge_worker.deleteLater()
except:
pass
if hasattr(self, 'charge_thread'):
try:
if not sip.isdeleted(self.charge_thread):
self.charge_thread.deleteLater()
except:
pass
finally:
if hasattr(self, 'charge_thread'):
del self.charge_thread
# 6. Finalize log file
self.finalize_log_file()
# 7. Reset UI and state
self.request_stop = False
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.toggle_button.setEnabled(True)
# 8. Show completion message if test wasn't stopped by user
if not self.request_stop:
test_current = self.c_rate * self.capacity
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
if self.current_mode == "Cycle Test":
message = (
f"Cycle test completed | "
f"Cycle {self.cycle_count} | "
f"Capacity: {self.capacity_ah:.4f}Ah | "
f"Efficiency: {self.coulomb_efficiency:.1f}%"
)
QMessageBox.information(
self,
"Test Completed",
f"Cycle test completed successfully.\n\n"
f"Test Parameters:\n"
f"- Capacity: {self.capacity} Ah\n"
f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n"
f"- Charge Cutoff: {self.charge_cutoff} V\n"
f"- Discharge Cutoff: {self.discharge_cutoff} V\n"
f"- Conditions: {test_conditions}\n\n"
f"Results:\n"
f"- Cycles: {self.cycle_count}\n"
f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n"
f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%"
)
elif self.current_mode == "Discharge Test":
message = (
f"Discharge completed | "
f"Capacity: {self.capacity_ah:.4f}Ah | "
f"Energy: {self.energy:.4f}Wh"
)
QMessageBox.information(
self,
"Discharge Completed",
f"Discharge test completed successfully.\n\n"
f"Test Parameters:\n"
f"- Capacity: {self.capacity} Ah\n"
f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n"
f"- Discharge Cutoff: {self.discharge_cutoff} V\n"
f"- Conditions: {test_conditions}\n\n"
f"Results:\n"
f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n"
f"- Energy delivered: {self.energy:.4f}Wh"
)
self.status_bar.showMessage(message)
except Exception as e:
print(f"Error in finalize_test: {e}")
import traceback
traceback.print_exc()
# Ensure we don't leave the UI in a locked state
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.toggle_button.setEnabled(True)
self.status_bar.showMessage("Error during test finalization")
def reset_plot(self):
"""Completely reset the plot - clears all data and visuals"""
# Clear line data
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
# Reset axes with appropriate ranges
voltage_padding = 0.2
min_voltage = 0
max_voltage = 5.0 # Max voltage for ADALM1000
self.ax.set_xlim(0, 10) # Reset X axis
self.ax.set_ylim(min_voltage, max_voltage)
self.ax.set_xlabel('Time (s)', color=self.fg_color)
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
self.ax.set_title('Battery Test', color=self.fg_color)
self.ax.tick_params(axis='x', colors=self.fg_color)
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
self.ax.grid(True, color='#4C566A')
# Reset twin axis (current)
current_padding = 0.05
self.ax2.set_xlim(0, 10)
self.ax2.set_ylim(-0.25 - current_padding, 0.25 + current_padding)
self.ax2.set_ylabel("Current (A)", color='r')
self.ax2.tick_params(axis='y', labelcolor='r')
# Redraw legends
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
# Force immediate redraw
self.canvas.draw()
def update_status_and_plot(self):
"""Combined status and plot update"""
self.update_status()
self.update_plot()
def update_plot(self):
"""Fixed plot method with safe attribute access"""
try:
if not self.active_device:
return
# Create local copies of data safely
with self.plot_mutex:
dev = self.active_device
if not dev.display_time_data:
return
x_data = list(dev.display_time_data)
y1_data = list(dev.display_voltage_data)
y2_data = list(dev.display_current_data)
# Update plot data
self.line_voltage.set_data(x_data, y1_data)
self.line_current.set_data(x_data, y2_data)
# Auto-scale when needed
if len(x_data) > 1:
self.auto_scale_axes()
# Force redraw
self.canvas.draw_idle()
except Exception as e:
print(f"Plot error: {e}")
# Attempt to recover
try:
self.reset_plot()
except:
pass
def auto_scale_axes(self):
"""Auto-scale plot axes with appropriate padding and strict boundaries"""
if not self.active_device or not self.active_device.time_data:
return
dev = self.active_device
min_time = 0
max_time = dev.time_data[-1]
current_xlim = self.ax.get_xlim()
if max_time > current_xlim[1] * 0.95:
new_max = max_time * 1.05
self.ax.set_xlim(min_time, new_max)
self.ax2.set_xlim(min_time, new_max)
voltage_padding = 0.2
if dev.voltage_data:
min_voltage = max(0, min(dev.voltage_data) - voltage_padding)
max_voltage = min(5.0, max(dev.voltage_data) + voltage_padding)
current_ylim = self.ax.get_ylim()
if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1):
self.ax.set_ylim(min_voltage, max_voltage)
current_padding = 0.05
if dev.current_data:
min_current = max(-0.25, min(dev.current_data) - current_padding)
max_current = min(0.25, max(dev.current_data) + current_padding)
current_ylim2 = self.ax2.get_ylim()
if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02):
self.ax2.set_ylim(min_current, max_current)
@pyqtSlot(str)
def handle_device_error(self, error_msg):
"""Enhanced error handling with recovery"""
print(f"DEVICE ERROR: {error_msg}")
# Special handling for USB errors
if "USB" in error_msg or "LIBUSB" in error_msg.upper():
error_msg += "\n\nCheck USB connection and permissions"
self.status_light.setStyleSheet("background-color: red;")
else:
self.status_light.setStyleSheet("background-color: orange;")
self.status_bar.showMessage(f"Device error: {error_msg}")
# Attempt automatic recovery for non-critical errors
if "No samples" in error_msg or "timed out" in error_msg:
QTimer.singleShot(1000, self.reconnect_device)
def validate_measurements(self, voltage, current):
"""Filter out invalid measurements"""
# Fix negative values caused by connection issues
if voltage < 0 or not (0 <= voltage <= 5.0):
return False
if abs(current) > 0.3: # Beyond ADALM1000's ±200mA range
return False
return True
@pyqtSlot(str)
def update_test_phase(self, phase_text):
"""Update the test phase display"""
self.test_phase = phase_text
self.phase_label.setText(phase_text)
@pyqtSlot(str)
def handle_test_error(self, error_msg):
"""Handle errors from the test sequence with complete cleanup"""
try:
# 1. Notify user
QMessageBox.critical(self, "Test Error",
f"An error occurred:\n{error_msg}\n\nAttempting to recover...")
# 2. Stop all operations
self.stop_test()
# 3. Reset UI elements
if hasattr(self, 'line_voltage'):
try:
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
self.ax.set_xlim(0, 1)
self.ax2.set_xlim(0, 1)
self.canvas.draw()
except Exception as plot_error:
print(f"Plot reset error: {plot_error}")
# 4. Update status
self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...")
self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;")
# 5. Attempt recovery
QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect
except Exception as e:
print(f"Error in error handler: {e}")
# Fallback - restart application?
QMessageBox.critical(self, "Fatal Error",
"The application needs to restart due to an unrecoverable error")
QTimer.singleShot(1000, self.close)
def attempt_reconnect(self):
"""Attempt to reconnect automatically"""
QMessageBox.critical(
self,
"Device Connection Error",
"Could not connect to ADALM1000\n\n"
"1. Check USB cable connection\n"
"2. The device will attempt to reconnect automatically"
)
QTimer.singleShot(1000, self.reconnect_device)
def reconnect_device(self):
"""Robuste Geräte-Neuerkennung mit vollständigem Reset und Statusfeedback"""
self.status_bar.showMessage("Starting device reconnection...")
self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;")
QApplication.processEvents() # Sofortiges UI-Update erzwingen
# 1. Vorhandene Verbindungen sauber beenden
try:
if hasattr(self, 'measurement_thread'):
self.measurement_thread.stop()
if not self.measurement_thread.wait(1000): # Timeout 1s
self.measurement_thread.terminate()
except Exception as e:
print(f"Error stopping measurement thread: {e}")
# 2. Alte Session bereinigen
if hasattr(self, 'session'):
try:
self.session.end()
except Exception as e:
print(f"Error ending session: {e}")
finally:
del self.session
# 3. Neue Session mit Fortschrittsfeedback
retry_count = 0
max_retries = 3
reconnect_delay = 2000 # ms
while retry_count < max_retries:
try:
self.status_bar.showMessage(f"Scanning for devices (Attempt {retry_count + 1}/{max_retries})...")
QApplication.processEvents()
# Neue Session erstellen
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
# Manueller Scan mit Timeout
scan_success = False
for _ in range(2): # Max 2 Scan-Versuche
self.session.scan()
if self.session.devices:
scan_success = True
break
time.sleep(0.5)
if not scan_success:
raise DeviceDisconnectedError("No devices detected")
# 4. Geräteliste aktualisieren
current_devices = {dev.serial: dev for dev in self.session.devices}
old_devices = self.devices.copy() if hasattr(self, 'devices') else {}
# Neue Geräte hinzufügen
new_devices = {}
for serial, dev in current_devices.items():
if serial in old_devices:
# Bestehendes Gerät wiederverwenden
new_devices[serial] = old_devices[serial]
else:
# Neues Gerät initialisieren
self.status_bar.showMessage(f"Initializing device {serial}...")
QApplication.processEvents()
manager = DeviceManager(dev)
manager.start_measurement(self.interval)
new_devices[serial] = manager
# Nicht mehr vorhandene Geräte entfernen
for serial, manager in old_devices.items():
if serial not in current_devices:
try:
manager.stop_measurement()
except Exception as e:
print(f"Error stopping device {serial}: {e}")
self.devices = new_devices
# 5. Aktives Gerät auswählen
current_serial = self.active_device.serial if (hasattr(self, 'active_device') and self.active_device) else None
# UI aktualisieren
self.device_combo.clear()
for serial in self.devices:
self.device_combo.addItem(serial)
if current_serial in self.devices:
self.device_combo.setCurrentText(current_serial)
self.active_device = self.devices[current_serial]
elif self.devices:
first_serial = next(iter(self.devices))
self.device_combo.setCurrentText(first_serial)
self.active_device = self.devices[first_serial]
else:
raise DeviceDisconnectedError("No valid devices available")
# Signalverbindungen herstellen
if hasattr(self.active_device, 'measurement_thread'):
self.measurement_thread = self.active_device.measurement_thread
self.measurement_thread.update_signal.connect(self.update_measurements)
self.measurement_thread.error_signal.connect(self.handle_device_error)
# Erfolgsmeldung
self.connection_label.setText(f"Connected: {self.active_device.serial}")
self.status_bar.showMessage("Successfully reconnected devices")
self.status_light.setStyleSheet("background-color: green; border-radius: 10px;")
self.toggle_button.setEnabled(True)
return
except DeviceDisconnectedError as e:
retry_count += 1
if retry_count < max_retries:
self.status_bar.showMessage(f"Reconnect failed: {e}. Retrying in {reconnect_delay/1000}s...")
QApplication.processEvents()
time.sleep(reconnect_delay/1000)
else:
self.status_bar.showMessage(f"Reconnect failed after {max_retries} attempts")
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
QTimer.singleShot(reconnect_delay, self.attempt_reconnect)
except Exception as e:
print(f"Critical reconnect error: {traceback.format_exc()}")
self.status_bar.showMessage(f"Critical error: {str(e)}")
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
QTimer.singleShot(reconnect_delay, self.attempt_reconnect)
return
def closeEvent(self, event):
"""Clean up on window close"""
self.test_running = False
self.measuring = False
self.session_active = False
# Stop measurement thread
if hasattr(self, 'measurement_thread'):
self.measurement_thread.stop()
# Stop test sequence thread
if hasattr(self, 'test_sequence_thread'):
if hasattr(self, 'test_sequence_worker'):
self.test_sequence_worker.stop()
self.test_sequence_thread.quit()
self.test_sequence_thread.wait(500)
# Stop discharge thread
if hasattr(self, 'discharge_thread'):
if hasattr(self, 'discharge_worker'):
self.discharge_worker.stop()
self.discharge_thread.quit()
self.discharge_thread.wait(500)
# Clean up device session
if hasattr(self, 'session') and self.session:
try:
self.session.end()
except Exception as e:
print(f"Error ending session: {e}")
event.accept()
if __name__ == "__main__":
app = QApplication([])
try:
window = BatteryTester()
window.show()
app.exec_()
except Exception as e:
import traceback
traceback.print_exc()
QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}")