Alle Spannungswerte korrekt gemessen werden
Das Programm stabil bleibt, auch ohne angeschlossene Batterie
Der Benutzer klare Rückmeldungen über den Systemzustand erhält
Die Hardware in einem sicheren Zustand bleibt
(D)
2053 lines
85 KiB
Python
2053 lines
85 KiB
Python
# -*- coding: utf-8 -*-
|
|
import os
|
|
import time
|
|
import csv
|
|
import threading
|
|
from datetime import datetime
|
|
import numpy as np
|
|
import matplotlib
|
|
matplotlib.use('Qt5Agg')
|
|
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
|
|
from matplotlib.figure import Figure
|
|
from collections import deque
|
|
from queue import Queue, Full, Empty
|
|
|
|
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel,
|
|
QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog, QComboBox)
|
|
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread
|
|
from PyQt5 import sip
|
|
import pysmu
|
|
|
|
class DeviceDisconnectedError(Exception):
|
|
pass
|
|
|
|
class MeasurementThread(QThread):
|
|
update_signal = pyqtSignal(float, float, float)
|
|
error_signal = pyqtSignal(str)
|
|
|
|
def __init__(self, device, interval=0.1):
|
|
super().__init__()
|
|
self.device = device
|
|
self.interval = interval
|
|
self._running = False
|
|
self.filter_window_size = 10
|
|
self.voltage_window = []
|
|
self.current_window = []
|
|
self.start_time = time.time()
|
|
self.measurement_queue = Queue(maxsize=1)
|
|
self.current_direction = 1 # 1 for source, -1 for sink
|
|
|
|
def run(self):
|
|
"""Continuous measurement with proper validation"""
|
|
self._running = True
|
|
consecutive_errors = 0
|
|
|
|
while self._running:
|
|
try:
|
|
samples = self.device.read(self.filter_window_size, 500, True)
|
|
|
|
# Check for device disconnection
|
|
if not samples:
|
|
consecutive_errors += 1
|
|
if consecutive_errors > 3:
|
|
raise DeviceDisconnectedError("Keine Messwerte empfangen")
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
consecutive_errors = 0 # Reset counter on successful read
|
|
|
|
current_time = time.time() - self.start_time
|
|
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B
|
|
|
|
# Strict voltage validation (0-5V range)
|
|
if raw_voltage < 0 or raw_voltage > 5.0:
|
|
raise ValueError(f"Ungültige Spannung: {raw_voltage}V (außerhalb 0-5V Bereich)")
|
|
|
|
# Current measurement with direction
|
|
raw_current = np.mean([s[0][1] for s in samples]) * self.current_direction
|
|
|
|
# Current validation (-200mA to +200mA)
|
|
if not (-0.25 <= raw_current <= 0.25):
|
|
raise ValueError(f"Ungültiger Strom: {raw_current}A (außerhalb ±200mA Bereich)")
|
|
|
|
# Apply filtering
|
|
self.voltage_window.append(raw_voltage)
|
|
self.current_window.append(raw_current)
|
|
|
|
if len(self.voltage_window) > self.filter_window_size:
|
|
self.voltage_window.pop(0)
|
|
self.current_window.pop(0)
|
|
|
|
voltage = np.mean(self.voltage_window)
|
|
current = np.mean(self.current_window)
|
|
|
|
# Emit measurements
|
|
self.update_signal.emit(voltage, current, current_time)
|
|
|
|
# Store in queue
|
|
try:
|
|
self.measurement_queue.put_nowait((voltage, current))
|
|
except Full:
|
|
pass
|
|
|
|
time.sleep(max(0.05, self.interval))
|
|
|
|
except DeviceDisconnectedError as e:
|
|
self.error_signal.emit(f"Gerätefehler: {str(e)}")
|
|
time.sleep(1)
|
|
continue
|
|
except Exception as e:
|
|
self.error_signal.emit(f"Messfehler: {str(e)}")
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
def set_direction(self, direction):
|
|
"""Set current direction (1 for source, -1 for sink)"""
|
|
self.current_direction = direction
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
self.wait(500)
|
|
|
|
class TestSequenceWorker(QObject):
|
|
finished = pyqtSignal()
|
|
update_phase = pyqtSignal(str)
|
|
update_status = pyqtSignal(str)
|
|
test_completed = pyqtSignal()
|
|
error_occurred = pyqtSignal(str)
|
|
|
|
def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent):
|
|
super().__init__()
|
|
self.device = device
|
|
self.test_current = test_current
|
|
self.charge_cutoff = charge_cutoff
|
|
self.discharge_cutoff = discharge_cutoff
|
|
self.rest_time = rest_time * 3600 # Convert hours to seconds
|
|
self.continuous_mode = continuous_mode
|
|
self.parent = parent
|
|
self._running = True
|
|
self.voltage_timeout = 0.5 # seconds
|
|
|
|
def get_latest_measurement(self):
|
|
"""Thread-safe measurement reading with timeout"""
|
|
try:
|
|
return self.parent.measurement_thread.measurement_queue.get(
|
|
timeout=self.voltage_timeout
|
|
)
|
|
except Empty:
|
|
return (None, None) # Return tuple for unpacking
|
|
|
|
def charge_phase(self):
|
|
"""Handle the battery charging phase"""
|
|
self.update_phase.emit("Charge")
|
|
self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.4f}A")
|
|
|
|
try:
|
|
# Configure channels - Channel A sources current, Channel B measures voltage
|
|
self.device.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.device.channels['A'].mode = pysmu.Mode.SIMV
|
|
self.device.channels['A'].constant(self.test_current)
|
|
self.parent.measurement_thread.set_direction(1) # Source current
|
|
|
|
# Small delay to allow current to stabilize
|
|
time.sleep(0.1)
|
|
|
|
while self._running:
|
|
voltage, current = self.get_latest_measurement()
|
|
if voltage is None:
|
|
continue
|
|
|
|
# Update parent's data for logging/display
|
|
with self.parent.plot_mutex:
|
|
if len(self.parent.voltage_data) > 0:
|
|
self.parent.voltage_data[-1] = voltage
|
|
self.parent.current_data[-1] = current
|
|
|
|
if voltage >= self.charge_cutoff:
|
|
break
|
|
|
|
time.sleep(0.1)
|
|
|
|
finally:
|
|
self.device.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.device.channels['A'].constant(0)
|
|
|
|
def discharge_phase(self):
|
|
"""Handle the battery discharging phase"""
|
|
voltage, _ = self.get_latest_measurement()
|
|
if voltage is not None and voltage <= self.discharge_cutoff:
|
|
self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)")
|
|
return
|
|
self.update_phase.emit("Discharge")
|
|
self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A")
|
|
|
|
try:
|
|
# Configure channels - Channel A sinks current, Channel B measures voltage
|
|
self.device.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.device.channels['A'].mode = pysmu.Mode.SIMV
|
|
self.device.channels['A'].constant(-self.test_current)
|
|
self.parent.measurement_thread.set_direction(-1) # Sink current
|
|
|
|
# Small delay to allow current to stabilize
|
|
time.sleep(0.1)
|
|
|
|
while self._running:
|
|
voltage, current = self.get_latest_measurement()
|
|
if voltage is None:
|
|
continue
|
|
|
|
# Update parent's data for logging/display
|
|
with self.parent.plot_mutex:
|
|
if len(self.parent.voltage_data) > 0:
|
|
self.parent.voltage_data[-1] = voltage
|
|
self.parent.current_data[-1] = current
|
|
|
|
if voltage <= self.discharge_cutoff:
|
|
break
|
|
|
|
time.sleep(0.1)
|
|
|
|
finally:
|
|
self.device.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.device.channels['A'].constant(0)
|
|
|
|
def rest_phase(self, phase_name):
|
|
"""Handle rest period between phases"""
|
|
self.update_phase.emit(f"Resting ({phase_name})")
|
|
rest_end = time.time() + self.rest_time
|
|
|
|
while time.time() < rest_end and self._running:
|
|
time_left = max(0, rest_end - time.time())
|
|
self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min")
|
|
time.sleep(1)
|
|
|
|
def stop(self):
|
|
"""Request the thread to stop"""
|
|
self._running = False
|
|
try:
|
|
self.device.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.device.channels['A'].constant(0)
|
|
self.device.channels['B'].mode = pysmu.Mode.HI_Z
|
|
except Exception as e:
|
|
print(f"Error stopping device: {e}")
|
|
|
|
def run(self):
|
|
"""Main test sequence loop"""
|
|
try:
|
|
first_cycle = True # Ensure at least one cycle runs
|
|
|
|
while (self._running and
|
|
(self.parent.continuous_mode_check.isChecked() or first_cycle)):
|
|
self.parent.request_stop = False
|
|
self.parent.cycle_count += 1
|
|
first_cycle = False # Only True for the first cycle
|
|
|
|
# 1. Charge phase (constant current)
|
|
self.charge_phase()
|
|
if not self._running or self.parent.request_stop:
|
|
break
|
|
|
|
# 2. Rest period after charge
|
|
self.rest_phase("Post-Charge")
|
|
if not self._running or self.parent.request_stop:
|
|
break
|
|
|
|
# 3. Discharge phase (capacity measurement)
|
|
self.discharge_phase()
|
|
if not self._running or self.parent.request_stop:
|
|
break
|
|
|
|
# 4. Rest period after discharge (only if not stopping)
|
|
if self._running and not self.parent.request_stop:
|
|
self.rest_phase("Post-Discharge")
|
|
|
|
# Calculate Coulomb efficiency if not stopping
|
|
if not self.parent.request_stop and self.parent.charge_capacity > 0:
|
|
self.parent.coulomb_efficiency = (
|
|
self.parent.capacity_ah / self.parent.charge_capacity
|
|
) * 100
|
|
|
|
# Test completed
|
|
self.test_completed.emit()
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"Test sequence error: {str(e)}")
|
|
finally:
|
|
self.finished.emit()
|
|
|
|
class DischargeWorker(QObject):
|
|
finished = pyqtSignal()
|
|
update_status = pyqtSignal(str)
|
|
test_completed = pyqtSignal()
|
|
error_occurred = pyqtSignal(str)
|
|
|
|
def __init__(self, device, test_current, discharge_cutoff, parent):
|
|
super().__init__()
|
|
self.device = device
|
|
self.test_current = test_current
|
|
self.discharge_cutoff = discharge_cutoff
|
|
self.parent = parent
|
|
self._running = True
|
|
self.voltage_timeout = 0.5 # seconds
|
|
|
|
def get_latest_measurement(self):
|
|
"""Thread-safe measurement reading with timeout"""
|
|
try:
|
|
return self.parent.measurement_thread.measurement_queue.get(
|
|
timeout=self.voltage_timeout
|
|
)
|
|
except Empty:
|
|
return (None, None) # Return tuple for unpacking
|
|
|
|
def discharge_phase(self):
|
|
"""Handle the battery discharging phase"""
|
|
voltage, _ = self.get_latest_measurement()
|
|
if voltage is not None and voltage <= self.discharge_cutoff:
|
|
self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)")
|
|
return
|
|
self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A")
|
|
|
|
try:
|
|
# Configure channels - Channel A sinks current, Channel B measures voltage
|
|
self.device.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.device.channels['A'].mode = pysmu.Mode.SIMV
|
|
self.device.channels['A'].constant(-self.test_current)
|
|
self.parent.measurement_thread.set_direction(-1) # Sink current
|
|
|
|
# Small delay to allow current to stabilize
|
|
time.sleep(0.1)
|
|
|
|
while self._running:
|
|
voltage, current = self.get_latest_measurement()
|
|
if voltage is None:
|
|
continue
|
|
|
|
# Update parent's data for logging/display
|
|
with self.parent.plot_mutex:
|
|
if len(self.parent.voltage_data) > 0:
|
|
self.parent.voltage_data[-1] = voltage
|
|
self.parent.current_data[-1] = current
|
|
|
|
if voltage <= self.discharge_cutoff:
|
|
break
|
|
|
|
time.sleep(0.1)
|
|
|
|
finally:
|
|
self.device.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.device.channels['A'].constant(0)
|
|
|
|
def stop(self):
|
|
"""Request the thread to stop"""
|
|
self._running = False
|
|
try:
|
|
self.device.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.device.channels['A'].constant(0)
|
|
self.device.channels['B'].mode = pysmu.Mode.HI_Z
|
|
except Exception as e:
|
|
print(f"Error stopping device: {e}")
|
|
|
|
def run(self):
|
|
"""Main discharge sequence"""
|
|
try:
|
|
self.parent.request_stop = False
|
|
self.parent.cycle_count = 1 # Only one discharge cycle
|
|
|
|
# Discharge phase
|
|
self.discharge_phase()
|
|
|
|
if not self._running or self.parent.request_stop:
|
|
return
|
|
|
|
# Test completed
|
|
self.test_completed.emit()
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"Discharge error: {str(e)}")
|
|
finally:
|
|
self.finished.emit()
|
|
|
|
class ChargeWorker(QObject):
|
|
finished = pyqtSignal()
|
|
update_status = pyqtSignal(str)
|
|
test_completed = pyqtSignal()
|
|
error_occurred = pyqtSignal(str)
|
|
|
|
def __init__(self, device, test_current, charge_cutoff, parent):
|
|
super().__init__()
|
|
self.device = device
|
|
self.test_current = test_current
|
|
self.charge_cutoff = charge_cutoff
|
|
self.parent = parent
|
|
self._running = True
|
|
|
|
def run(self):
|
|
"""Main charge sequence"""
|
|
try:
|
|
self.parent.measurement_thread.set_direction(1) # Source current
|
|
|
|
# Configure channels - Channel A sources current, Channel B measures voltage
|
|
self.device.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.device.channels['A'].mode = pysmu.Mode.SIMV
|
|
self.device.channels['A'].constant(self.test_current)
|
|
time.sleep(0.1) # Allow current to stabilize
|
|
|
|
while self._running:
|
|
voltage, current = self.parent.get_latest_measurement()
|
|
if voltage is None:
|
|
continue
|
|
|
|
# Update parent's data for logging/display
|
|
with self.parent.plot_mutex:
|
|
if len(self.parent.voltage_data) > 0:
|
|
self.parent.voltage_data[-1] = voltage
|
|
self.parent.current_data[-1] = current
|
|
|
|
if voltage >= self.charge_cutoff:
|
|
break
|
|
|
|
time.sleep(0.1)
|
|
|
|
self.test_completed.emit()
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"Charge error: {str(e)}")
|
|
finally:
|
|
self.device.channels['A'].constant(0)
|
|
self.finished.emit()
|
|
|
|
def stop(self):
|
|
"""Request the thread to stop"""
|
|
self._running = False
|
|
try:
|
|
self.device.channels['A'].constant(0)
|
|
except Exception as e:
|
|
print(f"Error stopping charge: {e}")
|
|
|
|
class BatteryTester(QMainWindow):
|
|
def __init__(self):
|
|
self.plot_mutex = threading.Lock()
|
|
super().__init__()
|
|
|
|
self.last_logged_phase = None
|
|
|
|
# Color scheme
|
|
self.bg_color = "#2E3440"
|
|
self.fg_color = "#D8DEE9"
|
|
self.accent_color = "#5E81AC"
|
|
self.warning_color = "#BF616A"
|
|
self.success_color = "#A3BE8C"
|
|
|
|
# Device and measurement state
|
|
self.session_active = False
|
|
self.measuring = False
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.request_stop = False
|
|
self.interval = 0.1
|
|
self.log_dir = os.path.expanduser("~/adalm1000/logs")
|
|
os.makedirs(self.log_dir, exist_ok=True)
|
|
|
|
# Data buffers
|
|
self.time_data = deque()
|
|
self.voltage_data = deque()
|
|
self.current_data = deque()
|
|
self.phase_data = deque()
|
|
|
|
# Initialize UI and device
|
|
self.setup_ui()
|
|
self.init_device()
|
|
|
|
# Set window properties
|
|
self.setWindowTitle("ADALM1000 - Battery Tester (Multi-Mode)")
|
|
self.resize(1000, 800)
|
|
self.setMinimumSize(800, 700)
|
|
|
|
# Status update timer
|
|
self.status_timer = QTimer()
|
|
self.status_timer.timeout.connect(self.update_status)
|
|
self.status_timer.start(1000) # Update every second
|
|
|
|
def setup_ui(self):
|
|
"""Configure the user interface"""
|
|
# Main widget and layout
|
|
self.central_widget = QWidget()
|
|
self.setCentralWidget(self.central_widget)
|
|
self.main_layout = QVBoxLayout(self.central_widget)
|
|
self.main_layout.setContentsMargins(10, 10, 10, 10)
|
|
|
|
# Mode selection
|
|
mode_frame = QFrame()
|
|
mode_frame.setFrameShape(QFrame.StyledPanel)
|
|
mode_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
|
|
mode_layout = QHBoxLayout(mode_frame)
|
|
|
|
self.mode_label = QLabel("Test Mode:")
|
|
self.mode_label.setStyleSheet(f"color: {self.fg_color};")
|
|
mode_layout.addWidget(self.mode_label)
|
|
|
|
self.mode_combo = QComboBox()
|
|
self.mode_combo.addItems(["Live Monitoring", "Discharge Test", "Charge Test", "Cycle Test"]) # Added Charge Test
|
|
self.mode_combo.setStyleSheet(f"""
|
|
QComboBox {{
|
|
background-color: #3B4252;
|
|
color: {self.fg_color};
|
|
border: 1px solid #4C566A;
|
|
border-radius: 3px;
|
|
padding: 2px;
|
|
}}
|
|
""")
|
|
self.mode_combo.currentTextChanged.connect(self.change_mode)
|
|
mode_layout.addWidget(self.mode_combo, 1)
|
|
|
|
self.main_layout.addWidget(mode_frame)
|
|
|
|
# Header area
|
|
header_frame = QFrame()
|
|
header_frame.setFrameShape(QFrame.NoFrame)
|
|
header_layout = QHBoxLayout(header_frame)
|
|
header_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
self.title_label = QLabel("ADALM1000 Battery Tester")
|
|
self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};")
|
|
header_layout.addWidget(self.title_label, 1)
|
|
|
|
# Status indicator
|
|
self.status_light = QLabel()
|
|
self.status_light.setFixedSize(20, 20)
|
|
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
|
|
header_layout.addWidget(self.status_light)
|
|
|
|
self.connection_label = QLabel("Disconnected")
|
|
header_layout.addWidget(self.connection_label)
|
|
|
|
# Reconnect button
|
|
self.reconnect_btn = QPushButton("Reconnect")
|
|
self.reconnect_btn.clicked.connect(self.reconnect_device)
|
|
header_layout.addWidget(self.reconnect_btn)
|
|
|
|
self.main_layout.addWidget(header_frame)
|
|
|
|
# Measurement display
|
|
display_frame = QFrame()
|
|
display_frame.setFrameShape(QFrame.StyledPanel)
|
|
display_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
|
|
display_layout = QGridLayout(display_frame)
|
|
|
|
# Measurement values - common for all modes
|
|
measurement_labels = [
|
|
("Voltage", "V"), ("Current", "A"), ("Test Phase", ""),
|
|
("Elapsed Time", "s"), ("Capacity", "Ah"), ("Power", "W"),
|
|
("Energy", "Wh"), ("Cycle Count", ""), ("Battery Temp", "°C")
|
|
]
|
|
|
|
for i, (label, unit) in enumerate(measurement_labels):
|
|
row = i // 3
|
|
col = (i % 3) * 3
|
|
|
|
lbl = QLabel(f"{label}:")
|
|
lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
|
|
display_layout.addWidget(lbl, row, col)
|
|
|
|
value_lbl = QLabel("0.000")
|
|
value_lbl.setStyleSheet(f"""
|
|
color: {self.fg_color};
|
|
font-weight: bold;
|
|
font-size: 12px;
|
|
min-width: 60px;
|
|
""")
|
|
display_layout.addWidget(value_lbl, row, col + 1)
|
|
|
|
if unit:
|
|
unit_lbl = QLabel(unit)
|
|
unit_lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
|
|
display_layout.addWidget(unit_lbl, row, col + 2)
|
|
|
|
for i in range(9):
|
|
display_layout.setColumnStretch(i, 1 if i % 3 == 1 else 0)
|
|
|
|
self.voltage_label = display_layout.itemAtPosition(0, 1).widget()
|
|
self.current_label = display_layout.itemAtPosition(0, 4).widget()
|
|
self.phase_label = display_layout.itemAtPosition(0, 7).widget()
|
|
self.time_label = display_layout.itemAtPosition(1, 1).widget()
|
|
self.capacity_label = display_layout.itemAtPosition(1, 4).widget()
|
|
self.power_label = display_layout.itemAtPosition(1, 7).widget()
|
|
self.energy_label = display_layout.itemAtPosition(2, 1).widget()
|
|
self.cycle_label = display_layout.itemAtPosition(2, 4).widget()
|
|
self.temp_label = display_layout.itemAtPosition(2, 7).widget()
|
|
|
|
self.main_layout.addWidget(display_frame)
|
|
|
|
# Control area
|
|
controls_frame = QFrame()
|
|
controls_frame.setFrameShape(QFrame.NoFrame)
|
|
controls_layout = QHBoxLayout(controls_frame)
|
|
controls_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
# Parameters frame
|
|
self.params_frame = QFrame()
|
|
self.params_frame.setFrameShape(QFrame.StyledPanel)
|
|
self.params_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
|
|
self.params_layout = QGridLayout(self.params_frame)
|
|
|
|
# Common parameters
|
|
self.capacity = 0.2
|
|
self.capacity_label_input = QLabel("Battery Capacity (Ah):")
|
|
self.capacity_label_input.setStyleSheet(f"color: {self.fg_color};")
|
|
self.params_layout.addWidget(self.capacity_label_input, 0, 0)
|
|
self.capacity_input = QLineEdit("0.2")
|
|
self.capacity_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
|
self.capacity_input.setFixedWidth(60)
|
|
self.params_layout.addWidget(self.capacity_input, 0, 1)
|
|
|
|
# C-rate for test
|
|
self.c_rate = 0.1
|
|
self.c_rate_label = QLabel("Test C-rate:")
|
|
self.c_rate_label.setStyleSheet(f"color: {self.fg_color};")
|
|
self.params_layout.addWidget(self.c_rate_label, 1, 0)
|
|
self.c_rate_input = QLineEdit("0.1")
|
|
self.c_rate_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
|
self.c_rate_input.setFixedWidth(40)
|
|
self.params_layout.addWidget(self.c_rate_input, 1, 1)
|
|
|
|
c_rate_note = QLabel("(e.g., 0.2 for C/5)")
|
|
c_rate_note.setStyleSheet(f"color: {self.fg_color};")
|
|
self.params_layout.addWidget(c_rate_note, 1, 2)
|
|
|
|
# Discharge cutoff (used in Discharge and Cycle modes)
|
|
self.discharge_cutoff = 0.9
|
|
self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):")
|
|
self.discharge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
|
|
self.params_layout.addWidget(self.discharge_cutoff_label, 2, 0)
|
|
self.discharge_cutoff_input = QLineEdit("0.9")
|
|
self.discharge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
|
self.discharge_cutoff_input.setFixedWidth(60)
|
|
self.params_layout.addWidget(self.discharge_cutoff_input, 2, 1)
|
|
|
|
# Charge cutoff (only for Cycle mode)
|
|
self.charge_cutoff = 1.43
|
|
self.charge_cutoff_label = QLabel("Charge Cutoff (V):")
|
|
self.charge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
|
|
self.params_layout.addWidget(self.charge_cutoff_label, 3, 0)
|
|
self.charge_cutoff_input = QLineEdit("1.43")
|
|
self.charge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
|
self.charge_cutoff_input.setFixedWidth(60)
|
|
self.params_layout.addWidget(self.charge_cutoff_input, 3, 1)
|
|
self.charge_cutoff_label.hide()
|
|
self.charge_cutoff_input.hide()
|
|
|
|
# Rest time (only for Cycle mode)
|
|
self.rest_time = 0.25
|
|
self.rest_time_label = QLabel("Rest Time (hours):")
|
|
self.rest_time_label.setStyleSheet(f"color: {self.fg_color};")
|
|
self.params_layout.addWidget(self.rest_time_label, 4, 0)
|
|
self.rest_time_input = QLineEdit("0.25")
|
|
self.rest_time_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
|
self.rest_time_input.setFixedWidth(60)
|
|
self.params_layout.addWidget(self.rest_time_input, 4, 1)
|
|
self.rest_time_label.hide()
|
|
self.rest_time_input.hide()
|
|
|
|
# Test conditions input
|
|
self.test_conditions_label = QLabel("Test Conditions/Chemistry:")
|
|
self.test_conditions_label.setStyleSheet(f"color: {self.fg_color};")
|
|
self.params_layout.addWidget(self.test_conditions_label, 5, 0)
|
|
self.test_conditions_input = QLineEdit("")
|
|
self.test_conditions_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
|
|
self.test_conditions_input.setFixedWidth(120)
|
|
self.params_layout.addWidget(self.test_conditions_input, 5, 1)
|
|
|
|
controls_layout.addWidget(self.params_frame, 1)
|
|
|
|
# Button frame
|
|
button_frame = QFrame()
|
|
button_frame.setFrameShape(QFrame.NoFrame)
|
|
button_layout = QVBoxLayout(button_frame)
|
|
button_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
# Start/Stop buttons
|
|
self.start_button = QPushButton("START")
|
|
self.start_button.setStyleSheet(f"""
|
|
QPushButton {{
|
|
background-color: {self.accent_color};
|
|
color: {self.fg_color};
|
|
font-weight: bold;
|
|
padding: 6px;
|
|
border-radius: 4px;
|
|
}}
|
|
QPushButton:disabled {{
|
|
background-color: #4C566A;
|
|
color: #D8DEE9;
|
|
}}
|
|
""")
|
|
self.start_button.clicked.connect(self.start_test)
|
|
button_layout.addWidget(self.start_button)
|
|
|
|
self.stop_button = QPushButton("STOP")
|
|
self.stop_button.setStyleSheet(f"""
|
|
QPushButton {{
|
|
background-color: {self.warning_color};
|
|
color: {self.fg_color};
|
|
font-weight: bold;
|
|
padding: 6px;
|
|
border-radius: 4px;
|
|
}}
|
|
QPushButton:disabled {{
|
|
background-color: #4C566A;
|
|
color: #D8DEE9;
|
|
}}
|
|
""")
|
|
self.stop_button.clicked.connect(self.stop_test)
|
|
self.stop_button.setEnabled(False)
|
|
button_layout.addWidget(self.stop_button)
|
|
|
|
# Continuous mode checkbox (only for Cycle mode)
|
|
self.continuous_mode_check = QCheckBox("Continuous Mode")
|
|
self.continuous_mode_check.setChecked(True)
|
|
self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")
|
|
button_layout.addWidget(self.continuous_mode_check)
|
|
self.continuous_mode_check.stateChanged.connect(self.handle_continuous_mode_change)
|
|
self.continuous_mode_check.hide()
|
|
|
|
# Record button for Live mode
|
|
self.record_button = QPushButton("Start Recording")
|
|
self.record_button.setCheckable(True)
|
|
self.record_button.setStyleSheet(f"""
|
|
QPushButton {{
|
|
background-color: {self.success_color};
|
|
color: {self.fg_color};
|
|
font-weight: bold;
|
|
padding: 6px;
|
|
border-radius: 4px;
|
|
}}
|
|
QPushButton:checked {{
|
|
background-color: {self.warning_color};
|
|
}}
|
|
""")
|
|
self.record_button.clicked.connect(self.toggle_recording)
|
|
button_layout.addWidget(self.record_button)
|
|
self.record_button.hide()
|
|
|
|
controls_layout.addWidget(button_frame)
|
|
self.main_layout.addWidget(controls_frame)
|
|
|
|
# Plot area
|
|
self.setup_plot()
|
|
|
|
# Status bar
|
|
self.status_bar = self.statusBar()
|
|
self.status_bar.setStyleSheet(f"color: {self.fg_color};")
|
|
self.status_bar.showMessage("Ready")
|
|
|
|
# Apply dark theme
|
|
self.setStyleSheet(f"""
|
|
QMainWindow {{
|
|
background-color: {self.bg_color};
|
|
}}
|
|
QLabel {{
|
|
color: {self.fg_color};
|
|
}}
|
|
QLineEdit {{
|
|
background-color: #3B4252;
|
|
color: {self.fg_color};
|
|
border: 1px solid #4C566A;
|
|
border-radius: 3px;
|
|
padding: 2px;
|
|
}}
|
|
""")
|
|
|
|
# Set initial mode
|
|
self.current_mode = "Live Monitoring"
|
|
self.mode_combo.setCurrentText(self.current_mode)
|
|
self.change_mode(self.current_mode) # Initialize UI for live mode
|
|
|
|
def change_mode(self, mode_name):
|
|
"""Change between different test modes"""
|
|
self.current_mode = mode_name
|
|
self.stop_test() # Stop any current operation
|
|
|
|
# Hide all optional parameters first
|
|
self.charge_cutoff_label.hide()
|
|
self.charge_cutoff_input.hide()
|
|
self.discharge_cutoff_label.hide()
|
|
self.discharge_cutoff_input.hide()
|
|
self.rest_time_label.hide()
|
|
self.rest_time_input.hide()
|
|
self.continuous_mode_check.hide()
|
|
self.record_button.hide()
|
|
|
|
# Show mode-specific parameters
|
|
if mode_name == "Cycle Test":
|
|
self.charge_cutoff_label.show()
|
|
self.charge_cutoff_input.show()
|
|
self.discharge_cutoff_label.show()
|
|
self.discharge_cutoff_input.show()
|
|
self.rest_time_label.show()
|
|
self.rest_time_input.show()
|
|
self.continuous_mode_check.show()
|
|
self.start_button.setText("START CYCLE TEST")
|
|
elif mode_name == "Discharge Test":
|
|
self.discharge_cutoff_label.show()
|
|
self.discharge_cutoff_input.show()
|
|
self.start_button.setText("START DISCHARGE")
|
|
elif mode_name == "Charge Test":
|
|
self.charge_cutoff_label.show()
|
|
self.charge_cutoff_input.show()
|
|
self.start_button.setText("START CHARGE")
|
|
elif mode_name == "Live Monitoring":
|
|
self.record_button.show()
|
|
self.start_button.setText("START MONITORING")
|
|
self.start_button.setEnabled(False)
|
|
|
|
self.status_bar.showMessage(f"Mode changed to {mode_name}")
|
|
|
|
def reset_test(self):
|
|
"""Reset test state without stopping measurement"""
|
|
# Clear data buffers
|
|
with self.plot_mutex:
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
if hasattr(self, 'phase_data'):
|
|
self.phase_data.clear()
|
|
|
|
# Reset capacities and timing
|
|
self.start_time = time.time()
|
|
self.last_update_time = self.start_time
|
|
self.capacity_ah = 0.0
|
|
self.energy = 0.0
|
|
if hasattr(self, 'charge_capacity'):
|
|
self.charge_capacity = 0.0
|
|
if hasattr(self, 'coulomb_efficiency'):
|
|
self.coulomb_efficiency = 0.0
|
|
|
|
# Reset plot
|
|
self.reset_plot()
|
|
|
|
# Update UI
|
|
self.phase_label.setText("Idle")
|
|
if hasattr(self, 'test_phase'):
|
|
self.test_phase = "Idle"
|
|
|
|
def toggle_recording(self):
|
|
"""Toggle data recording in Live Monitoring mode"""
|
|
if self.record_button.isChecked():
|
|
# Start recording
|
|
try:
|
|
if self.create_cycle_log_file():
|
|
self.record_button.setText("Stop Recording")
|
|
self.status_bar.showMessage("Live recording started")
|
|
# Ensure monitoring is running
|
|
if not self.test_running:
|
|
self.start_live_monitoring()
|
|
else:
|
|
self.record_button.setChecked(False)
|
|
self.current_cycle_file = None # Ensure it's None if creation failed
|
|
except Exception as e:
|
|
print(f"Error starting recording: {e}")
|
|
self.record_button.setChecked(False)
|
|
self.current_cycle_file = None # Ensure it's None on error
|
|
QMessageBox.critical(self, "Error", f"Failed to start recording:\n{str(e)}")
|
|
else:
|
|
# Stop recording
|
|
try:
|
|
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
|
|
self.finalize_log_file()
|
|
self.record_button.setText("Start Recording")
|
|
self.status_bar.showMessage("Live recording stopped")
|
|
except Exception as e:
|
|
print(f"Error stopping recording: {e}")
|
|
|
|
def handle_continuous_mode_change(self, state):
|
|
"""Handle changes to continuous mode checkbox during operation"""
|
|
if not state and self.test_running: # If unchecked during test
|
|
self.status_bar.showMessage("Continuous mode disabled - will complete current cycle")
|
|
self.continuous_mode_check.setStyleSheet(f"color: {self.warning_color};")
|
|
QTimer.singleShot(2000, lambda: self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};"))
|
|
|
|
def setup_plot(self):
|
|
"""Configure the matplotlib plot"""
|
|
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color)
|
|
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
|
|
self.ax = self.fig.add_subplot(111)
|
|
self.ax.set_facecolor('#3B4252')
|
|
|
|
# Set initial voltage range
|
|
voltage_padding = 0.2
|
|
min_voltage = max(0, 0.9 - voltage_padding)
|
|
max_voltage = 1.43 + voltage_padding
|
|
self.ax.set_ylim(min_voltage, max_voltage)
|
|
|
|
# Voltage plot
|
|
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
|
|
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
|
|
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
|
|
|
|
# Current plot (right axis)
|
|
self.ax2 = self.ax.twinx()
|
|
current_padding = 0.05
|
|
test_current = 0.1 * 0.2 # Default values
|
|
max_current = test_current * 1.5
|
|
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
|
|
|
|
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
|
|
self.ax2.set_ylabel("Current (A)", color='r')
|
|
self.ax2.tick_params(axis='y', labelcolor='r')
|
|
|
|
self.ax.set_xlabel('Time (s)', color=self.fg_color)
|
|
self.ax.set_title('Battery Test', color=self.fg_color)
|
|
self.ax.tick_params(axis='x', colors=self.fg_color)
|
|
self.ax.grid(True, color='#4C566A')
|
|
|
|
# Position legends
|
|
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
|
|
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
|
|
|
|
# Embed plot
|
|
self.canvas = FigureCanvas(self.fig)
|
|
self.canvas.setStyleSheet(f"background-color: {self.bg_color};")
|
|
self.main_layout.addWidget(self.canvas, 1)
|
|
|
|
def init_device(self):
|
|
"""Initialize the ADALM1000 with proper connection checks"""
|
|
try:
|
|
# Cleanup previous session
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
self.session.end()
|
|
del self.session
|
|
except:
|
|
pass
|
|
|
|
# Hardware reset delay
|
|
time.sleep(1)
|
|
|
|
try:
|
|
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
|
|
if not self.session.devices:
|
|
raise Exception("Kein ADALM1000 erkannt - Verbindung prüfen")
|
|
except Exception as e:
|
|
if "resource busy" in str(e).lower():
|
|
raise Exception("ADALM1000 wird bereits von einem anderen Programm verwendet")
|
|
raise
|
|
|
|
self.dev = self.session.devices[0]
|
|
|
|
# Set safe defaults
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
self.dev.channels['B'].constant(0)
|
|
|
|
# Connection test with actual measurement
|
|
try:
|
|
samples = self.dev.read(5, 500, True) # 5 samples for stability
|
|
if not samples:
|
|
raise DeviceDisconnectedError("Keine Messwerte empfangen")
|
|
|
|
# Check if we're getting valid data (not just noise)
|
|
voltages = [s[1][0] for s in samples] # Channel B voltages
|
|
if all(abs(v) < 0.0001 for v in voltages): # 100µV threshold
|
|
self.status_bar.showMessage("Gerät verbunden, aber keine Batterie angeschlossen")
|
|
except Exception as e:
|
|
raise DeviceDisconnectedError(f"Verbindungstest fehlgeschlagen: {str(e)}")
|
|
|
|
self.session.start(0)
|
|
|
|
# Update UI
|
|
self.status_light.setStyleSheet("background-color: green; border-radius: 10px;")
|
|
self.connection_label.setText("Verbunden")
|
|
self.status_bar.showMessage("Gerät bereit - Batterie anschließen um zu messen")
|
|
self.session_active = True
|
|
self.start_button.setEnabled(True)
|
|
|
|
# Start measurement thread
|
|
self.measurement_thread = MeasurementThread(self.dev, self.interval)
|
|
self.measurement_thread.update_signal.connect(self.update_measurements)
|
|
self.measurement_thread.error_signal.connect(self.handle_device_error)
|
|
self.measurement_thread.start()
|
|
|
|
except Exception as e:
|
|
self.handle_device_error(str(e))
|
|
|
|
@pyqtSlot(float, float, float)
|
|
def update_measurements(self, voltage, current, current_time):
|
|
try:
|
|
# Only store data if in a test or recording
|
|
if not (self.test_running or self.record_button.isChecked()):
|
|
return
|
|
|
|
with self.plot_mutex:
|
|
self.time_data.append(current_time)
|
|
self.voltage_data.append(voltage)
|
|
self.current_data.append(current)
|
|
|
|
# Update display labels
|
|
self.voltage_label.setText(f"{voltage:.4f}")
|
|
self.current_label.setText(f"{abs(current):.4f}")
|
|
self.time_label.setText(self.format_time(current_time))
|
|
|
|
# Calculate and display power and energy
|
|
power = voltage * abs(current)
|
|
self.power_label.setText(f"{power:.4f}")
|
|
|
|
if len(self.time_data) > 1:
|
|
delta_t = self.time_data[-1] - self.time_data[-2]
|
|
self.energy += power * delta_t / 3600 # Convert to Wh
|
|
self.energy_label.setText(f"{self.energy:.4f}")
|
|
|
|
# Plot updates throttled to 10Hz
|
|
now = time.time()
|
|
if not hasattr(self, '_last_plot_update'):
|
|
self._last_plot_update = 0
|
|
|
|
if now - self._last_plot_update >= 0.1:
|
|
self._last_plot_update = now
|
|
QTimer.singleShot(0, self.update_plot)
|
|
|
|
except Exception as e:
|
|
print(f"Error in update_measurements: {e}")
|
|
|
|
def update_status(self):
|
|
"""Update status information periodically"""
|
|
now = time.time()
|
|
if not hasattr(self, '_last_log_time'):
|
|
self._last_log_time = now
|
|
|
|
if self.test_running or (hasattr(self, 'record_button') and self.record_button.isChecked()):
|
|
# Update capacity calculations if in test mode
|
|
if self.time_data:
|
|
current_time = time.time() - self.start_time
|
|
delta_t = current_time - self.last_update_time
|
|
self.last_update_time = current_time
|
|
|
|
current_current = abs(self.current_data[-1])
|
|
self.capacity_ah += current_current * delta_t / 3600
|
|
self.capacity_label.setText(f"{self.capacity_ah:.4f}")
|
|
|
|
# Logging (1x per second)
|
|
if hasattr(self, 'log_writer') and hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
|
|
if self.time_data and not self.current_cycle_file.closed and (now - self._last_log_time >= 1.0):
|
|
try:
|
|
current_time = self.time_data[-1]
|
|
voltage = self.voltage_data[-1]
|
|
current = self.current_data[-1]
|
|
|
|
if self.current_mode == "Cycle Test":
|
|
self.log_writer.writerow([
|
|
f"{current_time:.4f}",
|
|
f"{voltage:.6f}",
|
|
f"{current:.6f}",
|
|
self.test_phase,
|
|
f"{self.capacity_ah:.4f}",
|
|
f"{self.charge_capacity:.4f}",
|
|
f"{self.coulomb_efficiency:.1f}",
|
|
f"{self.cycle_count}"
|
|
])
|
|
else:
|
|
self.log_writer.writerow([
|
|
f"{current_time:.4f}",
|
|
f"{voltage:.6f}",
|
|
f"{current:.6f}",
|
|
self.test_phase if hasattr(self, 'test_phase') else "Live",
|
|
f"{self.capacity_ah:.4f}",
|
|
f"{voltage * current:.4f}", # Power
|
|
f"{self.energy:.4f}", # Energy
|
|
f"{self.cycle_count}" if hasattr(self, 'cycle_count') else "1"
|
|
])
|
|
self.current_cycle_file.flush()
|
|
self._last_log_time = now
|
|
except Exception as e:
|
|
print(f"Error writing to log file: {e}")
|
|
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
|
|
try:
|
|
self.current_cycle_file.close()
|
|
except:
|
|
pass
|
|
self.record_button.setChecked(False)
|
|
self.current_cycle_file = None
|
|
|
|
def start_test(self):
|
|
"""Start the selected test mode"""
|
|
if self.current_mode == "Cycle Test":
|
|
self.start_cycle_test()
|
|
elif self.current_mode == "Discharge Test":
|
|
self.start_discharge_test()
|
|
elif self.current_mode == "Charge Test":
|
|
self.start_charge_test()
|
|
elif self.current_mode == "Live Monitoring":
|
|
self.start_live_monitoring()
|
|
|
|
def start_cycle_test(self):
|
|
"""Start the battery cycle test"""
|
|
# Clean up any previous test
|
|
if hasattr(self, 'test_sequence_thread'):
|
|
self.test_sequence_thread.quit()
|
|
self.test_sequence_thread.wait(500)
|
|
if hasattr(self, 'test_sequence_worker'):
|
|
self.test_sequence_worker.deleteLater()
|
|
del self.test_sequence_thread
|
|
|
|
# Reset stop flag
|
|
self.request_stop = False
|
|
|
|
if not self.test_running:
|
|
try:
|
|
# Get parameters from UI
|
|
self.capacity = float(self.capacity_input.text())
|
|
self.charge_cutoff = float(self.charge_cutoff_input.text())
|
|
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
|
|
self.rest_time = float(self.rest_time_input.text())
|
|
self.c_rate = float(self.c_rate_input.text())
|
|
|
|
# Validate inputs
|
|
if self.capacity <= 0:
|
|
raise ValueError("Battery capacity must be positive")
|
|
if self.charge_cutoff <= self.discharge_cutoff:
|
|
raise ValueError("Charge cutoff must be higher than discharge cutoff")
|
|
if self.c_rate <= 0:
|
|
raise ValueError("C-rate must be positive")
|
|
|
|
test_current = self.c_rate * self.capacity
|
|
if test_current > 0.2:
|
|
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
|
|
|
|
# Clear ALL previous data completely
|
|
with self.plot_mutex:
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
self.phase_data.clear()
|
|
|
|
# Reset capacities and timing
|
|
self.start_time = time.time()
|
|
self.last_update_time = self.start_time
|
|
self.capacity_ah = 0.0
|
|
self.charge_capacity = 0.0
|
|
self.coulomb_efficiency = 0.0
|
|
self.cycle_count = 0
|
|
self.energy = 0.0
|
|
|
|
# Reset measurement thread's timer and queues
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.start_time = time.time()
|
|
self.measurement_thread.voltage_window.clear()
|
|
self.measurement_thread.current_window.clear()
|
|
with self.measurement_thread.measurement_queue.mutex:
|
|
self.measurement_thread.measurement_queue.queue.clear()
|
|
|
|
# Reset plot completely
|
|
self.reset_plot()
|
|
|
|
# Start test
|
|
self.test_running = True
|
|
self.start_time = time.time()
|
|
self.last_update_time = time.time()
|
|
self.test_phase = "Initial Discharge"
|
|
self.phase_label.setText(self.test_phase)
|
|
|
|
self.start_button.setEnabled(False)
|
|
self.stop_button.setEnabled(True)
|
|
self.status_bar.showMessage(f"Cycle test started | Current: {test_current:.4f}A")
|
|
|
|
# Create log file
|
|
self.create_cycle_log_file()
|
|
|
|
# Start test sequence in a QThread
|
|
self.test_sequence_thread = QThread()
|
|
self.test_sequence_worker = TestSequenceWorker(
|
|
self.dev,
|
|
test_current,
|
|
self.charge_cutoff,
|
|
self.discharge_cutoff,
|
|
self.rest_time,
|
|
self.continuous_mode_check.isChecked(),
|
|
self # Pass reference to main window for callbacks
|
|
)
|
|
self.test_sequence_worker.moveToThread(self.test_sequence_thread)
|
|
|
|
# Connect signals
|
|
self.test_sequence_worker.update_phase.connect(self.update_test_phase)
|
|
self.test_sequence_worker.update_status.connect(self.status_bar.showMessage)
|
|
self.test_sequence_worker.test_completed.connect(self.finalize_test)
|
|
self.test_sequence_worker.error_occurred.connect(self.handle_test_error)
|
|
self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit)
|
|
self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater)
|
|
self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater)
|
|
|
|
# Start the thread and the worker's run method
|
|
self.test_sequence_thread.start()
|
|
QTimer.singleShot(0, self.test_sequence_worker.run)
|
|
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", str(e))
|
|
# Ensure buttons are in correct state if error occurs
|
|
self.start_button.setEnabled(True)
|
|
self.stop_button.setEnabled(False)
|
|
|
|
def start_discharge_test(self):
|
|
"""Start the battery discharge test"""
|
|
# Clean up any previous test
|
|
if hasattr(self, 'discharge_thread'):
|
|
self.discharge_thread.quit()
|
|
self.discharge_thread.wait(500)
|
|
if hasattr(self, 'discharge_worker'):
|
|
self.discharge_worker.deleteLater()
|
|
del self.discharge_thread
|
|
|
|
# Reset stop flag
|
|
self.request_stop = False
|
|
|
|
if not self.test_running:
|
|
try:
|
|
# Get parameters from UI
|
|
self.capacity = float(self.capacity_input.text())
|
|
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
|
|
self.c_rate = float(self.c_rate_input.text())
|
|
|
|
# Validate inputs
|
|
if self.capacity <= 0:
|
|
raise ValueError("Battery capacity must be positive")
|
|
if self.c_rate <= 0:
|
|
raise ValueError("C-rate must be positive")
|
|
|
|
test_current = self.c_rate * self.capacity
|
|
if test_current > 0.2:
|
|
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
|
|
|
|
# Clear ALL previous data completely
|
|
with self.plot_mutex:
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
|
|
# Reset capacities and timing
|
|
self.start_time = time.time()
|
|
self.last_update_time = self.start_time
|
|
self.capacity_ah = 0.0
|
|
self.energy = 0.0
|
|
self.cycle_count = 1
|
|
|
|
# Reset measurement thread's timer and queues
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.start_time = time.time()
|
|
self.measurement_thread.voltage_window.clear()
|
|
self.measurement_thread.current_window.clear()
|
|
with self.measurement_thread.measurement_queue.mutex:
|
|
self.measurement_thread.measurement_queue.queue.clear()
|
|
|
|
# Reset plot completely
|
|
self.reset_plot()
|
|
|
|
# Start test
|
|
self.test_running = True
|
|
self.start_time = time.time()
|
|
self.last_update_time = time.time()
|
|
self.test_phase = "Discharge"
|
|
self.phase_label.setText(self.test_phase)
|
|
|
|
self.start_button.setEnabled(False)
|
|
self.stop_button.setEnabled(True)
|
|
self.status_bar.showMessage(f"Discharge started | Current: {test_current:.4f}A")
|
|
|
|
# Create log file
|
|
self.create_cycle_log_file()
|
|
|
|
# Start discharge worker in a QThread
|
|
self.discharge_thread = QThread()
|
|
self.discharge_worker = DischargeWorker(
|
|
self.dev,
|
|
test_current,
|
|
self.discharge_cutoff,
|
|
self # Pass reference to main window for callbacks
|
|
)
|
|
self.discharge_worker.moveToThread(self.discharge_thread)
|
|
|
|
# Connect signals
|
|
self.discharge_worker.update_status.connect(self.status_bar.showMessage)
|
|
self.discharge_worker.test_completed.connect(self.finalize_test)
|
|
self.discharge_worker.error_occurred.connect(self.handle_test_error)
|
|
self.discharge_worker.finished.connect(self.discharge_thread.quit)
|
|
self.discharge_worker.finished.connect(self.discharge_worker.deleteLater)
|
|
self.discharge_thread.finished.connect(self.discharge_thread.deleteLater)
|
|
|
|
# Start the thread and the worker's run method
|
|
self.discharge_thread.start()
|
|
QTimer.singleShot(0, self.discharge_worker.run)
|
|
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", str(e))
|
|
# Ensure buttons are in correct state if error occurs
|
|
self.start_button.setEnabled(True)
|
|
self.stop_button.setEnabled(False)
|
|
|
|
def start_charge_test(self):
|
|
"""Start the battery charge test"""
|
|
# Clean up any previous test
|
|
if hasattr(self, 'charge_thread'):
|
|
self.charge_thread.quit()
|
|
self.charge_thread.wait(500)
|
|
if hasattr(self, 'charge_worker'):
|
|
self.charge_worker.deleteLater()
|
|
del self.charge_thread
|
|
|
|
# Reset stop flag
|
|
self.request_stop = False
|
|
|
|
if not self.test_running:
|
|
try:
|
|
# Get parameters from UI
|
|
self.capacity = float(self.capacity_input.text())
|
|
self.charge_cutoff = float(self.charge_cutoff_input.text())
|
|
self.c_rate = float(self.c_rate_input.text())
|
|
|
|
# Validate inputs
|
|
if self.capacity <= 0:
|
|
raise ValueError("Battery capacity must be positive")
|
|
if self.c_rate <= 0:
|
|
raise ValueError("C-rate must be positive")
|
|
|
|
test_current = self.c_rate * self.capacity
|
|
if test_current > 0.2:
|
|
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
|
|
|
|
# Clear ALL previous data completely
|
|
with self.plot_mutex:
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
|
|
# Reset capacities and timing
|
|
self.start_time = time.time()
|
|
self.last_update_time = self.start_time
|
|
self.capacity_ah = 0.0
|
|
self.energy = 0.0
|
|
self.cycle_count = 1
|
|
|
|
# Reset measurement thread
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.start_time = time.time()
|
|
self.measurement_thread.voltage_window.clear()
|
|
self.measurement_thread.current_window.clear()
|
|
with self.measurement_thread.measurement_queue.mutex:
|
|
self.measurement_thread.measurement_queue.queue.clear()
|
|
|
|
# Reset plot
|
|
self.reset_plot()
|
|
|
|
# Start test
|
|
self.test_running = True
|
|
self.start_time = time.time()
|
|
self.last_update_time = time.time()
|
|
self.test_phase = "Charge"
|
|
self.phase_label.setText(self.test_phase)
|
|
|
|
self.start_button.setEnabled(False)
|
|
self.stop_button.setEnabled(True)
|
|
self.status_bar.showMessage(f"Charge started @ {test_current:.3f}A to {self.charge_cutoff}V")
|
|
|
|
# Create log file
|
|
self.create_cycle_log_file()
|
|
|
|
# Start charge worker in a QThread
|
|
self.charge_thread = QThread()
|
|
self.charge_worker = ChargeWorker(
|
|
self.dev,
|
|
test_current,
|
|
self.charge_cutoff,
|
|
self
|
|
)
|
|
self.charge_worker.moveToThread(self.charge_thread)
|
|
|
|
# Connect signals
|
|
self.charge_worker.update_status.connect(self.status_bar.showMessage)
|
|
self.charge_worker.test_completed.connect(self.finalize_test)
|
|
self.charge_worker.error_occurred.connect(self.handle_test_error)
|
|
self.charge_worker.finished.connect(self.charge_thread.quit)
|
|
self.charge_worker.finished.connect(self.charge_worker.deleteLater)
|
|
self.charge_thread.finished.connect(self.charge_thread.deleteLater)
|
|
|
|
# Start the thread
|
|
self.charge_thread.start()
|
|
QTimer.singleShot(0, self.charge_worker.run)
|
|
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", str(e))
|
|
self.start_button.setEnabled(True)
|
|
self.stop_button.setEnabled(False)
|
|
|
|
def start_live_monitoring(self):
|
|
"""Start live monitoring mode"""
|
|
try:
|
|
# Clear previous data
|
|
with self.plot_mutex:
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
|
|
# Reset timing and measurements
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.start_time = time.time()
|
|
self.start_time = time.time()
|
|
self.last_update_time = self.start_time
|
|
self.capacity_ah = 0.0
|
|
self.energy = 0.0
|
|
|
|
# Set monitoring flags
|
|
self.test_running = True
|
|
self.test_phase = "Live Monitoring"
|
|
self.phase_label.setText(self.test_phase)
|
|
|
|
# Update UI
|
|
self.stop_button.setEnabled(True)
|
|
self.start_button.setEnabled(False)
|
|
|
|
# Configure device for monitoring
|
|
if hasattr(self, 'dev'):
|
|
try:
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
except Exception as e:
|
|
print(f"Error configuring device for monitoring: {e}")
|
|
|
|
self.status_bar.showMessage("Live monitoring started")
|
|
except Exception as e:
|
|
print(f"Error starting live monitoring: {e}")
|
|
self.test_running = False
|
|
QMessageBox.critical(self, "Error", f"Failed to start monitoring:\n{str(e)}")
|
|
|
|
def create_cycle_log_file(self):
|
|
"""Create a new log file for the current test"""
|
|
try:
|
|
# Close previous file if exists
|
|
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
|
|
try:
|
|
self.current_cycle_file.close()
|
|
except Exception as e:
|
|
print(f"Error closing previous log file: {e}")
|
|
|
|
# Ensure log directory exists
|
|
os.makedirs(self.log_dir, exist_ok=True)
|
|
|
|
if not os.access(self.log_dir, os.W_OK):
|
|
QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}")
|
|
return False
|
|
|
|
# Generate filename based on mode
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
if self.current_mode == "Cycle Test":
|
|
self.filename = os.path.join(self.log_dir, f"battery_cycle_{timestamp}.csv")
|
|
elif self.current_mode == "Discharge Test":
|
|
self.filename = os.path.join(self.log_dir, f"battery_discharge_{timestamp}.csv")
|
|
else: # Live Monitoring
|
|
self.filename = os.path.join(self.log_dir, f"battery_live_{timestamp}.csv")
|
|
|
|
# Open new file
|
|
try:
|
|
self.current_cycle_file = open(self.filename, 'w', newline='')
|
|
|
|
# Write header with test parameters
|
|
test_current = self.c_rate * self.capacity
|
|
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
|
|
|
|
self.current_cycle_file.write(f"# ADALM1000 Battery Test Log - {self.current_mode}\n")
|
|
self.current_cycle_file.write(f"# Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
self.current_cycle_file.write(f"# Battery Capacity: {self.capacity} Ah\n")
|
|
|
|
if self.current_mode != "Live Monitoring":
|
|
self.current_cycle_file.write(f"# Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n")
|
|
|
|
if self.current_mode == "Cycle Test":
|
|
self.current_cycle_file.write(f"# Charge Cutoff: {self.charge_cutoff} V\n")
|
|
self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n")
|
|
self.current_cycle_file.write(f"# Rest Time: {self.rest_time} hours\n")
|
|
elif self.current_mode == "Discharge Test":
|
|
self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n")
|
|
|
|
self.current_cycle_file.write(f"# Test Conditions/Chemistry: {test_conditions}\n")
|
|
self.current_cycle_file.write("#\n")
|
|
|
|
# Write data header
|
|
self.log_writer = csv.writer(self.current_cycle_file)
|
|
|
|
if self.current_mode == "Cycle Test":
|
|
self.log_writer.writerow([
|
|
"Time(s)", "Voltage(V)", "Current(A)", "Phase",
|
|
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
|
|
"Coulomb_Eff(%)", "Cycle"
|
|
])
|
|
else:
|
|
self.log_writer.writerow([
|
|
"Time(s)", "Voltage(V)", "Current(A)", "Phase",
|
|
"Capacity(Ah)", "Power(W)", "Energy(Wh)", "Cycle"
|
|
])
|
|
return True
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", f"Failed to create log file: {e}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error in create_cycle_log_file: {e}")
|
|
return False
|
|
|
|
def finalize_log_file(self):
|
|
"""Finalize the current log file"""
|
|
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
|
|
try:
|
|
test_current = self.c_rate * self.capacity
|
|
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
|
|
|
|
self.current_cycle_file.write("\n# TEST SUMMARY\n")
|
|
self.current_cycle_file.write(f"# Test Parameters:\n")
|
|
self.current_cycle_file.write(f"# - Battery Capacity: {self.capacity} Ah\n")
|
|
|
|
if self.current_mode != "Live Monitoring":
|
|
self.current_cycle_file.write(f"# - Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n")
|
|
|
|
if self.current_mode == "Cycle Test":
|
|
self.current_cycle_file.write(f"# - Charge Cutoff: {self.charge_cutoff} V\n")
|
|
self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n")
|
|
self.current_cycle_file.write(f"# - Rest Time: {self.rest_time} hours\n")
|
|
elif self.current_mode == "Discharge Test":
|
|
self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n")
|
|
|
|
self.current_cycle_file.write(f"# - Test Conditions: {test_conditions}\n")
|
|
self.current_cycle_file.write(f"# Results:\n")
|
|
|
|
if self.current_mode == "Cycle Test":
|
|
self.current_cycle_file.write(f"# - Cycles Completed: {self.cycle_count}\n")
|
|
self.current_cycle_file.write(f"# - Final Discharge Capacity: {self.capacity_ah:.4f} Ah\n")
|
|
self.current_cycle_file.write(f"# - Final Charge Capacity: {self.charge_capacity:.4f} Ah\n")
|
|
self.current_cycle_file.write(f"# - Coulombic Efficiency: {self.coulomb_efficiency:.1f}%\n")
|
|
else:
|
|
self.current_cycle_file.write(f"# - Capacity: {self.capacity_ah:.4f} Ah\n")
|
|
self.current_cycle_file.write(f"# - Energy: {self.energy:.4f} Wh\n")
|
|
|
|
self.current_cycle_file.close()
|
|
except Exception as e:
|
|
print(f"Error closing log file: {e}")
|
|
finally:
|
|
self.current_cycle_file = None
|
|
|
|
def format_time(self, seconds):
|
|
"""Convert seconds to hh:mm:ss format"""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
seconds = int(seconds % 60)
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
|
|
|
def stop_test(self):
|
|
"""Request immediate stop of the current test or monitoring"""
|
|
if not self.test_running and not (hasattr(self, 'record_button') and self.record_button.isChecked()):
|
|
return
|
|
|
|
self.request_stop = True
|
|
self.test_running = False
|
|
self.measuring = False
|
|
|
|
# Stop any active test threads
|
|
if hasattr(self, 'test_sequence_worker'):
|
|
try:
|
|
if not sip.isdeleted(self.test_sequence_worker):
|
|
self.test_sequence_worker.stop()
|
|
except:
|
|
pass
|
|
|
|
if hasattr(self, 'discharge_worker'):
|
|
try:
|
|
if not sip.isdeleted(self.discharge_worker):
|
|
self.discharge_worker.stop()
|
|
except:
|
|
pass
|
|
|
|
# Stop recording if active
|
|
if hasattr(self, 'record_button') and self.record_button.isChecked():
|
|
self.record_button.setChecked(False)
|
|
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
|
|
self.finalize_log_file()
|
|
self.record_button.setText("Start Recording")
|
|
|
|
# Reset device to safe state
|
|
if hasattr(self, 'dev'):
|
|
try:
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
except Exception as e:
|
|
print(f"Error resetting device: {e}")
|
|
|
|
# Clear all data buffers
|
|
with self.plot_mutex:
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
if hasattr(self, 'phase_data'):
|
|
self.phase_data.clear()
|
|
|
|
# Reset measurements
|
|
self.capacity_ah = 0.0
|
|
self.energy = 0.0
|
|
if hasattr(self, 'charge_capacity'):
|
|
self.charge_capacity = 0.0
|
|
if hasattr(self, 'coulomb_efficiency'):
|
|
self.coulomb_efficiency = 0.0
|
|
|
|
# Reset plot
|
|
self.reset_plot()
|
|
|
|
# Update UI
|
|
self.test_phase = "Idle"
|
|
self.phase_label.setText(self.test_phase)
|
|
self.stop_button.setEnabled(False)
|
|
self.start_button.setEnabled(True)
|
|
|
|
if self.current_mode == "Live Monitoring":
|
|
self.status_bar.showMessage("Live monitoring stopped")
|
|
else:
|
|
self.status_bar.showMessage("Test stopped - Ready for new test")
|
|
|
|
def finalize_test(self):
|
|
"""Final cleanup after test completes or is stopped"""
|
|
try:
|
|
# 1. Stop any active measurement or test operations
|
|
self.measuring = False
|
|
self.test_running = False
|
|
|
|
# 2. Reset device to safe state
|
|
if hasattr(self, 'dev'):
|
|
try:
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
except Exception as e:
|
|
print(f"Error resetting device in finalize: {e}")
|
|
|
|
# 3. Clean up test sequence thread safely
|
|
if hasattr(self, 'test_sequence_thread'):
|
|
try:
|
|
if self.test_sequence_thread.isRunning():
|
|
if hasattr(self, 'test_sequence_worker'):
|
|
try:
|
|
self.test_sequence_worker.stop()
|
|
except RuntimeError:
|
|
pass
|
|
|
|
self.test_sequence_thread.quit()
|
|
self.test_sequence_thread.wait(500)
|
|
except Exception as e:
|
|
print(f"Error stopping test sequence thread: {e}")
|
|
finally:
|
|
if hasattr(self, 'test_sequence_worker'):
|
|
try:
|
|
if not sip.isdeleted(self.test_sequence_worker):
|
|
self.test_sequence_worker.deleteLater()
|
|
except:
|
|
pass
|
|
|
|
if hasattr(self, 'test_sequence_thread'):
|
|
try:
|
|
if not sip.isdeleted(self.test_sequence_thread):
|
|
self.test_sequence_thread.deleteLater()
|
|
except:
|
|
pass
|
|
finally:
|
|
if hasattr(self, 'test_sequence_thread'):
|
|
del self.test_sequence_thread
|
|
|
|
# 4. Clean up discharge thread safely
|
|
if hasattr(self, 'discharge_thread'):
|
|
try:
|
|
if self.discharge_thread.isRunning():
|
|
if hasattr(self, 'discharge_worker'):
|
|
try:
|
|
self.discharge_worker.stop()
|
|
except RuntimeError:
|
|
pass
|
|
|
|
self.discharge_thread.quit()
|
|
self.discharge_thread.wait(500)
|
|
except Exception as e:
|
|
print(f"Error stopping discharge thread: {e}")
|
|
finally:
|
|
if hasattr(self, 'discharge_worker'):
|
|
try:
|
|
if not sip.isdeleted(self.discharge_worker):
|
|
self.discharge_worker.deleteLater()
|
|
except:
|
|
pass
|
|
|
|
if hasattr(self, 'discharge_thread'):
|
|
try:
|
|
if not sip.isdeleted(self.discharge_thread):
|
|
self.discharge_thread.deleteLater()
|
|
except:
|
|
pass
|
|
finally:
|
|
if hasattr(self, 'discharge_thread'):
|
|
del self.discharge_thread
|
|
|
|
# 5. Clean up charge thread safely (using same pattern as discharge thread)
|
|
if hasattr(self, 'charge_thread'):
|
|
try:
|
|
if self.charge_thread.isRunning():
|
|
if hasattr(self, 'charge_worker'):
|
|
try:
|
|
self.charge_worker.stop()
|
|
except RuntimeError:
|
|
pass
|
|
|
|
self.charge_thread.quit()
|
|
self.charge_thread.wait(500)
|
|
except Exception as e:
|
|
print(f"Error stopping charge thread: {e}")
|
|
finally:
|
|
if hasattr(self, 'charge_worker'):
|
|
try:
|
|
if not sip.isdeleted(self.charge_worker):
|
|
self.charge_worker.deleteLater()
|
|
except:
|
|
pass
|
|
|
|
if hasattr(self, 'charge_thread'):
|
|
try:
|
|
if not sip.isdeleted(self.charge_thread):
|
|
self.charge_thread.deleteLater()
|
|
except:
|
|
pass
|
|
finally:
|
|
if hasattr(self, 'charge_thread'):
|
|
del self.charge_thread
|
|
|
|
# 6. Finalize log file
|
|
self.finalize_log_file()
|
|
|
|
# 7. Reset UI and state
|
|
self.request_stop = False
|
|
self.start_button.setEnabled(True)
|
|
self.stop_button.setEnabled(False)
|
|
|
|
# 8. Show completion message if test wasn't stopped by user
|
|
if not self.request_stop:
|
|
test_current = self.c_rate * self.capacity
|
|
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
|
|
|
|
if self.current_mode == "Cycle Test":
|
|
message = (
|
|
f"Cycle test completed | "
|
|
f"Cycle {self.cycle_count} | "
|
|
f"Capacity: {self.capacity_ah:.4f}Ah | "
|
|
f"Efficiency: {self.coulomb_efficiency:.1f}%"
|
|
)
|
|
|
|
QMessageBox.information(
|
|
self,
|
|
"Test Completed",
|
|
f"Cycle test completed successfully.\n\n"
|
|
f"Test Parameters:\n"
|
|
f"- Capacity: {self.capacity} Ah\n"
|
|
f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n"
|
|
f"- Charge Cutoff: {self.charge_cutoff} V\n"
|
|
f"- Discharge Cutoff: {self.discharge_cutoff} V\n"
|
|
f"- Conditions: {test_conditions}\n\n"
|
|
f"Results:\n"
|
|
f"- Cycles: {self.cycle_count}\n"
|
|
f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n"
|
|
f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%"
|
|
)
|
|
elif self.current_mode == "Discharge Test":
|
|
message = (
|
|
f"Discharge completed | "
|
|
f"Capacity: {self.capacity_ah:.4f}Ah | "
|
|
f"Energy: {self.energy:.4f}Wh"
|
|
)
|
|
|
|
QMessageBox.information(
|
|
self,
|
|
"Discharge Completed",
|
|
f"Discharge test completed successfully.\n\n"
|
|
f"Test Parameters:\n"
|
|
f"- Capacity: {self.capacity} Ah\n"
|
|
f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n"
|
|
f"- Discharge Cutoff: {self.discharge_cutoff} V\n"
|
|
f"- Conditions: {test_conditions}\n\n"
|
|
f"Results:\n"
|
|
f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n"
|
|
f"- Energy delivered: {self.energy:.4f}Wh"
|
|
)
|
|
|
|
self.status_bar.showMessage(message)
|
|
|
|
except Exception as e:
|
|
print(f"Error in finalize_test: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
# Ensure we don't leave the UI in a locked state
|
|
self.start_button.setEnabled(True)
|
|
self.stop_button.setEnabled(False)
|
|
self.status_bar.showMessage("Error during test finalization")
|
|
|
|
def reset_plot(self):
|
|
"""Completely reset the plot - clears all data and visuals"""
|
|
# 1. Clear line data
|
|
self.line_voltage.set_data([], [])
|
|
self.line_current.set_data([], [])
|
|
|
|
# 2. Clear data buffers
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
if hasattr(self, 'phase_data'):
|
|
self.phase_data.clear()
|
|
|
|
# 3. Reset axes with appropriate ranges
|
|
voltage_padding = 0.2
|
|
min_voltage = 0
|
|
max_voltage = 5.0 # Max voltage for ADALM1000
|
|
|
|
self.ax.set_xlim(0, 10) # Reset X axis
|
|
self.ax.set_ylim(min_voltage, max_voltage)
|
|
|
|
# Reset twin axis (current)
|
|
current_padding = 0.05
|
|
self.ax2.set_xlim(0, 10)
|
|
self.ax2.set_ylim(-0.25 - current_padding, 0.25 + current_padding)
|
|
|
|
# 4. Clear any matplotlib internal caches
|
|
self.fig.canvas.draw_idle()
|
|
self.fig.canvas.flush_events()
|
|
|
|
# 5. Force immediate redraw
|
|
self.canvas.draw()
|
|
|
|
def update_plot(self):
|
|
"""More reliable plotting with better error handling"""
|
|
try:
|
|
# Create local copies safely
|
|
with self.plot_mutex:
|
|
if not self.time_data or not self.voltage_data or not self.current_data:
|
|
return
|
|
|
|
if len(self.time_data) != len(self.voltage_data) or len(self.time_data) != len(self.current_data):
|
|
# Find the minimum length to avoid mismatch
|
|
min_len = min(len(self.time_data), len(self.voltage_data), len(self.current_data))
|
|
x_data = np.array(self.time_data[-min_len:])
|
|
y1_data = np.array(self.voltage_data[-min_len:])
|
|
y2_data = np.array(self.current_data[-min_len:])
|
|
else:
|
|
x_data = np.array(self.time_data)
|
|
y1_data = np.array(self.voltage_data)
|
|
y2_data = np.array(self.current_data)
|
|
|
|
# Update plot data
|
|
self.line_voltage.set_data(x_data, y1_data)
|
|
self.line_current.set_data(x_data, y2_data)
|
|
|
|
# Auto-scale when needed
|
|
if len(x_data) > 0 and x_data[-1] > self.ax.get_xlim()[1] * 0.8:
|
|
self.auto_scale_axes()
|
|
|
|
# Force redraw
|
|
self.canvas.draw_idle()
|
|
|
|
except Exception as e:
|
|
print(f"Plot update error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
# Reset plot on error
|
|
with self.plot_mutex:
|
|
self.line_voltage.set_data([], [])
|
|
self.line_current.set_data([], [])
|
|
self.canvas.draw_idle()
|
|
|
|
def auto_scale_axes(self):
|
|
"""Auto-scale plot axes with appropriate padding and strict boundaries"""
|
|
if not self.time_data:
|
|
return
|
|
|
|
min_time = 0
|
|
max_time = self.time_data[-1]
|
|
current_xlim = self.ax.get_xlim()
|
|
|
|
if max_time > current_xlim[1] * 0.95:
|
|
new_max = max_time * 1.05
|
|
self.ax.set_xlim(min_time, new_max)
|
|
self.ax2.set_xlim(min_time, new_max)
|
|
|
|
voltage_padding = 0.2
|
|
if self.voltage_data:
|
|
min_voltage = max(0, min(self.voltage_data) - voltage_padding)
|
|
max_voltage = min(5.0, max(self.voltage_data) + voltage_padding)
|
|
current_ylim = self.ax.get_ylim()
|
|
if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1):
|
|
self.ax.set_ylim(min_voltage, max_voltage)
|
|
|
|
current_padding = 0.05
|
|
if self.current_data:
|
|
min_current = max(-0.25, min(self.current_data) - current_padding)
|
|
max_current = min(0.25, max(self.current_data) + current_padding)
|
|
current_ylim2 = self.ax2.get_ylim()
|
|
if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02):
|
|
self.ax2.set_ylim(min_current, max_current)
|
|
|
|
@pyqtSlot(str)
|
|
def handle_device_error(self, error):
|
|
"""Handle errors with proper state management"""
|
|
error_msg = str(error)
|
|
print(f"Fehler: {error_msg}")
|
|
|
|
# Special cases
|
|
if "resource busy" in error_msg.lower():
|
|
error_msg = "ADALM1000 is already in use"
|
|
elif "no samples" in error_msg.lower():
|
|
error_msg = "No measurments - Check connection"
|
|
|
|
# Cleanup
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
if self.session_active:
|
|
self.session.end()
|
|
del self.session
|
|
except:
|
|
pass
|
|
|
|
# Update UI
|
|
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
|
|
self.connection_label.setText("Disconnected")
|
|
self.status_bar.showMessage(f"Error: {error_msg}")
|
|
|
|
# Disable controls
|
|
self.session_active = False
|
|
self.test_running = False
|
|
self.start_button.setEnabled(False)
|
|
self.stop_button.setEnabled(False)
|
|
|
|
# Attempt recovery for certain errors
|
|
if "no samples" in error_msg.lower() or "resource busy" in error_msg.lower():
|
|
QTimer.singleShot(3000, self.reconnect_device)
|
|
|
|
@pyqtSlot(str)
|
|
def update_test_phase(self, phase_text):
|
|
"""Update the test phase display"""
|
|
self.test_phase = phase_text
|
|
self.phase_label.setText(phase_text)
|
|
|
|
@pyqtSlot(str)
|
|
def handle_test_error(self, error_msg):
|
|
"""Handle errors from the test sequence with complete cleanup"""
|
|
try:
|
|
# 1. Notify user
|
|
QMessageBox.critical(self, "Test Error",
|
|
f"An error occurred:\n{error_msg}\n\nAttempting to recover...")
|
|
|
|
# 2. Stop all operations
|
|
self.stop_test()
|
|
|
|
# 3. Reset UI elements
|
|
if hasattr(self, 'line_voltage'):
|
|
try:
|
|
self.line_voltage.set_data([], [])
|
|
self.line_current.set_data([], [])
|
|
self.ax.set_xlim(0, 1)
|
|
self.ax2.set_xlim(0, 1)
|
|
self.canvas.draw()
|
|
except Exception as plot_error:
|
|
print(f"Plot reset error: {plot_error}")
|
|
|
|
# 4. Update status
|
|
self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...")
|
|
self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;")
|
|
|
|
# 5. Attempt recovery
|
|
QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect
|
|
|
|
except Exception as e:
|
|
print(f"Error in error handler: {e}")
|
|
# Fallback - restart application?
|
|
QMessageBox.critical(self, "Fatal Error",
|
|
"The application needs to restart due to an unrecoverable error")
|
|
QTimer.singleShot(1000, self.close)
|
|
|
|
def attempt_reconnect(self):
|
|
"""Attempt to reconnect automatically"""
|
|
QMessageBox.critical(
|
|
self,
|
|
"Device Connection Error",
|
|
"Could not connect to ADALM1000\n\n"
|
|
"1. Check USB cable connection\n"
|
|
"2. The device will attempt to reconnect automatically"
|
|
)
|
|
|
|
QTimer.singleShot(1000, self.reconnect_device)
|
|
|
|
def reconnect_device(self):
|
|
"""Reconnect the device with proper cleanup"""
|
|
self.status_bar.showMessage("Attempting to reconnect...")
|
|
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
if self.session_active:
|
|
self.session.end()
|
|
del self.session
|
|
except:
|
|
pass
|
|
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.measuring = False
|
|
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.stop()
|
|
|
|
time.sleep(1.5)
|
|
|
|
try:
|
|
self.init_device()
|
|
if self.session_active:
|
|
self.status_bar.showMessage("Reconnected successfully")
|
|
return
|
|
except Exception as e:
|
|
print(f"Reconnect failed: {e}")
|
|
|
|
self.status_bar.showMessage("Reconnect failed - will retry...")
|
|
QTimer.singleShot(2000, self.reconnect_device)
|
|
|
|
def closeEvent(self, event):
|
|
"""Clean up on window close"""
|
|
self.test_running = False
|
|
self.measuring = False
|
|
self.session_active = False
|
|
|
|
# Stop measurement thread
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.stop()
|
|
|
|
# Stop test sequence thread
|
|
if hasattr(self, 'test_sequence_thread'):
|
|
if hasattr(self, 'test_sequence_worker'):
|
|
self.test_sequence_worker.stop()
|
|
self.test_sequence_thread.quit()
|
|
self.test_sequence_thread.wait(500)
|
|
|
|
# Stop discharge thread
|
|
if hasattr(self, 'discharge_thread'):
|
|
if hasattr(self, 'discharge_worker'):
|
|
self.discharge_worker.stop()
|
|
self.discharge_thread.quit()
|
|
self.discharge_thread.wait(500)
|
|
|
|
# Clean up device session
|
|
if hasattr(self, 'session') and self.session:
|
|
try:
|
|
self.session.end()
|
|
except Exception as e:
|
|
print(f"Error ending session: {e}")
|
|
|
|
event.accept()
|
|
|
|
if __name__ == "__main__":
|
|
app = QApplication([])
|
|
try:
|
|
window = BatteryTester()
|
|
window.show()
|
|
app.exec_()
|
|
except Exception as e:
|
|
QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}") |