Adalm1000_Battery_SMU/MainCode/adalm1000_logger.py
Jan 716955900b MainCode/adalm1000_logger.py aktualisiert
it works but only after switching device
D
2025-08-08 13:43:05 +02:00

2972 lines
118 KiB
Python

# -*- coding: utf-8 -*-
import os
import time
import csv
import threading
import traceback
from datetime import datetime
import numpy as np
import matplotlib
import subprocess
import logging
import sys
from usb.core import USBError
matplotlib.use('Qt5Agg')
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
from collections import deque
from queue import Queue, Full, Empty
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QGridLayout, QLabel, QPushButton, QLineEdit, QCheckBox,
QFrame, QMessageBox, QFileDialog, QComboBox)
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread
from PyQt5.QtGui import QDoubleValidator
from PyQt5 import sip
from PyQt5.QtCore import pyqtSignal
import pysmu
from pysmu import Session
import matplotlib as mpl
mpl.rcParams['font.family'] = 'sans-serif'
mpl.rcParams['font.sans-serif'] = ['DejaVu Sans', 'Arial', 'Liberation Sans', 'Verdana'] # Fallback fonts
mpl.rcParams['axes.edgecolor'] = '#D8DEE9'
mpl.rcParams['text.color'] = '#D8DEE9'
mpl.rcParams['axes.labelcolor'] = '#D8DEE9'
mpl.rcParams['xtick.color'] = '#D8DEE9'
mpl.rcParams['ytick.color'] = '#D8DEE9'
logging.getLogger('matplotlib.font_manager').setLevel(logging.WARNING)
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("adalm_debug.log", mode="a")
]
)
logger = logging.getLogger("adalm")
class DeviceDisconnectedError(Exception):
pass
class DeviceManager:
def __init__(self, session=None, dev=None, serial=None):
self.session = session
self.dev = dev
self.serial = serial
self.measurement_thread = None
self.lock = threading.RLock() # reentrant lock
self._last_open_attempt = 0
logger.info(f"Created DeviceManager for {serial}")
mpl.font_manager._get_font.cache_clear()
self.measurement_thread = MeasurementThread(self, 0.1, self)
# Initialize data buffers and statistics
self.time_data = deque(maxlen=10000)
self.voltage_data = deque(maxlen=10000)
self.current_data = deque(maxlen=10000)
self.display_time_data = deque(maxlen=1000)
self.display_voltage_data = deque(maxlen=1000)
self.display_current_data = deque(maxlen=1000)
self.capacity_ah = 0.0
self.energy = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
self.test_phase = "Idle"
# Add these new attributes for recording state
self.start_time = time.time() # Initialize with current time
self.last_update_time = self.start_time
self.is_recording = False
self.log_file = None
self.log_writer = None
self._last_log_time = 0
self.log_dir = os.path.expanduser("~/adalm1000/logs")
os.makedirs(self.log_dir, exist_ok=True)
self.start_time = None
self.last_measurement_time = None
def reset_data(self):
"""Reset all data buffers and statistics for the device"""
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
self.display_time_data.clear()
self.display_voltage_data.clear()
self.display_current_data.clear()
self.capacity_ah = 0.0
self.energy = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
self.test_phase = "Idle"
# Reset recording state
self.is_recording = False
self._last_log_time = 0
def safe_call(self, fn, *args, **kwargs):
"""Call fn under device lock and handle USB errors."""
with self.lock:
try:
return fn(*args, **kwargs)
except (USBError, OSError, pysmu.exceptions.USBError) as e:
logger.exception(f"USB error in safe_call for {self.serial}")
raise DeviceDisconnectedError(f"USB error: {e}") from e
except Exception as e:
logger.exception(f"Error in safe_call for {self.serial}")
raise
def set_simv_current(self, current):
"""Set channel A to SIMV and apply current."""
def _do():
chA = self.dev.channels['A']
if hasattr(chA, 'Mode'):
try:
chA.mode = chA.Mode.SIMV
except Exception:
logger.debug("Couldn't set chA.mode to SIMV")
if hasattr(chA, 'constant'):
chA.constant(abs(current))
else:
try:
chA.set_current(abs(current))
except Exception:
logger.debug("No channel.constant or set_current available")
logger.debug(f"Device {self.serial}: set_simv_current {current}")
return self.safe_call(_do)
def hi_z(self):
def _do():
chA = self.dev.channels['A']
if hasattr(chA, 'Mode'):
try:
chA.mode = chA.Mode.HI_Z
except Exception:
logger.debug("Couldn't set chA.mode to HI_Z")
if hasattr(chA, 'constant'):
chA.constant(0)
logger.debug(f"Device {self.serial}: hi_z()")
return self.safe_call(_do)
def close(self):
"""Close session/device references safely."""
with self.lock:
try:
if self.dev:
try:
if hasattr(self.dev, 'close'):
self.dev.close()
except Exception:
logger.exception(f"Error closing device {self.serial}")
self.dev = None
if self.session:
try:
if hasattr(self.session, 'close'):
self.session.close()
except Exception:
logger.exception(f"Error closing session for {self.serial}")
self.session = None
except Exception:
logger.exception(f"Error in DeviceManager.close for {self.serial}")
def reopen_with_backoff(self, max_attempts=5, base_delay=1.0):
"""Try to reopen the device with exponential backoff."""
now = time.time()
if now - self._last_open_attempt < 0.2:
time.sleep(0.2)
self._last_open_attempt = now
for attempt in range(1, max_attempts + 1):
try:
logger.info(f"Attempting to reopen device {self.serial} (attempt {attempt}/{max_attempts})")
from pysmu import Session
s = Session()
devs = s.list_devices()
chosen = None
for d in devs:
try:
if hasattr(d, 'serial') and d.serial == self.serial:
chosen = s.open(d.serial)
break
if isinstance(d, (list, tuple)) and str(d[0]) == str(self.serial):
chosen = s.open(str(d[0]))
break
except Exception:
continue
if chosen is None and devs:
logger.warning(f"Could not find serial {self.serial}; opening first device")
chosen = s.open(devs[0].serial if hasattr(devs[0], 'serial') else str(devs[0][0]))
if chosen:
with self.lock:
try:
if self.dev and hasattr(self.dev, 'close'):
self.dev.close()
except Exception:
logger.exception("Error closing previous dev")
self.session = s
self.dev = chosen
logger.info(f"Reopened device {self.serial} OK")
return True
except Exception as e:
logger.exception(f"Reopen attempt {attempt} failed for {self.serial}: {e}")
time.sleep(base_delay * attempt)
logger.error(f"Failed to reopen device {self.serial} after {max_attempts} attempts")
return False
class MeasurementThread(QThread):
update_signal = pyqtSignal(str, float, float, float)
error_signal = pyqtSignal(str)
def __init__(self, device_manager, interval, parent_manager):
super().__init__()
self.dev_manager = device_manager
self.interval = interval
self._running = True
self.filter_window_size = 10
self.voltage_window = []
self.current_window = []
self.start_time = None
self.measurement_queue = Queue(maxsize=1)
self.current_direction = 1
self.parent_manager = parent_manager
def stop(self):
self._running = False
try:
self.dev_manager.hi_z()
except Exception as e:
logger.error(f"Error stopping device: {e}")
self.wait(500)
def run(self):
logger.info(f"MeasurementThread STARTED for {self.parent_manager.serial}")
if not self.dev_manager.dev:
logger.error("No device available")
self.error_signal.emit("Device not initialized")
return
# Initialize timing
self.dev_manager.start_time = time.time()
while self._running:
try:
start_time = time.time()
# Get measurement
samples = self.dev_manager.dev.get_samples(1)
if not samples:
time.sleep(0.01)
continue
sample = samples[0]
voltage = sample[0][0]
current = sample[0][1]
current_time = time.time()
# Validate before sending
if not (0 <= voltage <= 5.0) or not (-0.2 <= current <= 0.2):
continue
# Emit update
self.update_signal.emit(
self.parent_manager.serial,
voltage,
current,
current_time
)
# Maintain precise timing
elapsed = time.time() - start_time
sleep_time = max(0.001, self.interval - elapsed)
time.sleep(sleep_time)
except (USBError, pysmu.exceptions.USBError) as e:
logger.error(f"USB Error: {str(e)}")
self.error_signal.emit("USB communication error")
time.sleep(1)
except Exception as e:
logger.error(f"Measurement error: {str(e)}")
time.sleep(0.5)
def _do_measure_once(self):
"""Actual measurement code protected by DeviceManager lock"""
# This will be called inside DeviceManager.safe_call
samples = self.dev_manager.dev.get_samples(1)
if samples:
sample = samples[0]
voltage = sample[0][0] # Channel A voltage
current = sample[0][1] # Channel A current
return voltage, current, time.time()
return 0, 0, time.time()
def set_direction(self, direction):
self.current_direction = direction
class TestSequenceWorker(QThread):
finished = pyqtSignal()
update_phase = pyqtSignal(str)
update_status = pyqtSignal(str)
test_completed = pyqtSignal()
error_occurred = pyqtSignal(str)
def __init__(self, device_manager, test_current, charge_cutoff, discharge_cutoff,
rest_time_hours, continuous_mode, parent=None):
super().__init__(parent)
self.dev_manager = device_manager # DeviceManager instance
self.device = device_manager.dev # raw pysmu device for direct calls
self.test_current = float(test_current)
self.charge_cutoff = float(charge_cutoff)
self.discharge_cutoff = float(discharge_cutoff)
self.rest_time = float(rest_time_hours) * 3600.0
self.continuous_mode = bool(continuous_mode)
self.parent = parent
self._running = True
self.voltage_timeout = 0.5
def stop(self):
self._running = False
def _set_direction_on_measurement_thread(self, direction):
try:
mt = getattr(self.dev_manager, 'measurement_thread', None)
if mt and hasattr(mt, 'set_direction'):
mt.set_direction(direction)
except Exception:
pass
def _set_simv_current(self, current):
try:
self.dev_manager.set_simv_current(current)
except DeviceDisconnectedError:
logger.error("Device disconnected during current set")
raise
def _hi_z(self):
try:
self.dev_manager.hi_z()
except DeviceDisconnectedError:
logger.error("Device disconnected during hi-z")
raise
def get_latest_measurement(self):
try:
q = self.dev_manager.measurement_thread.measurement_queue
item = q.get(timeout=self.voltage_timeout)
# support multiple formats
if isinstance(item, (tuple, list)):
if len(item) >= 3 and isinstance(item[0], str):
_, v, i, *rest = item
ts = rest[0] if rest else None
return v, i, ts
elif len(item) >= 2:
v = item[0]; i = item[1]; ts = item[2] if len(item) >= 3 else None
return v, i, ts
return None, None, None
except Empty:
return None, None, None
except Exception:
return None, None, None
def charge_phase(self):
self.update_phase.emit("Charge")
self._set_direction_on_measurement_thread(1)
self._set_simv_current(abs(self.test_current))
while self._running:
v, i, ts = self.get_latest_measurement()
if v is None:
time.sleep(0.05); continue
if v >= self.charge_cutoff:
break
time.sleep(0.1)
self._hi_z()
def discharge_phase(self):
self.update_phase.emit("Discharge")
self._set_direction_on_measurement_thread(-1)
self._set_simv_current(-abs(self.test_current))
while self._running:
v, i, ts = self.get_latest_measurement()
if v is None:
time.sleep(0.05); continue
if v <= self.discharge_cutoff:
break
time.sleep(0.1)
self._hi_z()
def rest_phase(self, phase_name):
self.update_phase.emit(f"Rest: {phase_name}")
rest_end = time.time() + self.rest_time
while time.time() < rest_end and self._running:
time.sleep(0.5)
self.update_status.emit(f"Resting | {phase_name} | {max(0, rest_end-time.time()):.0f}s left")
def run(self):
try:
while self._running:
self.discharge_phase()
if not self._running: break
if self.rest_time > 0: self.rest_phase("Post-Discharge")
if not self._running: break
self.charge_phase()
if not self._running: break
if self.rest_time > 0: self.rest_phase("Post-Charge")
if not self.continuous_mode:
break
self.test_completed.emit()
except Exception as e:
self.error_occurred.emit(str(e))
finally:
try: self._hi_z()
except: pass
self.finished.emit()
class DischargeWorker(QThread):
finished = pyqtSignal()
update_status = pyqtSignal(str)
test_completed = pyqtSignal()
error_occurred = pyqtSignal(str)
def __init__(self, device_manager, discharge_current, cutoff_voltage, parent=None):
super().__init__(parent)
self.dev_manager = device_manager
self.device = device_manager.dev
self.discharge_current = float(discharge_current)
self.cutoff_voltage = float(cutoff_voltage)
self._running = True
self.voltage_timeout = 0.5
def stop(self):
self._running = False
def _set_direction_on_measurement_thread(self, direction):
try:
mt = getattr(self.dev_manager, 'measurement_thread', None)
if mt and hasattr(mt, 'set_direction'):
mt.set_direction(direction)
except Exception:
pass
def _set_simv_current(self, current):
try:
self.dev_manager.set_simv_current(current)
except DeviceDisconnectedError:
logger.error("Device disconnected during current set")
raise
def _hi_z(self):
try:
self.dev_manager.hi_z()
except DeviceDisconnectedError:
logger.error("Device disconnected during hi-z")
raise
def get_latest_measurement(self):
try:
q = self.dev_manager.measurement_thread.measurement_queue
item = q.get(timeout=self.voltage_timeout)
if isinstance(item, (tuple, list)):
if len(item) >= 3 and isinstance(item[0], str):
_, v, i, *rest = item; ts = rest[0] if rest else None; return v, i, ts
elif len(item) >= 2:
v = item[0]; i = item[1]; ts = item[2] if len(item)>=3 else None; return v, i, ts
return None, None, None
except Empty:
return None, None, None
except Exception:
return None, None, None
def run(self):
try:
self._set_direction_on_measurement_thread(-1)
self._set_simv_current(-abs(self.discharge_current))
while self._running:
v, i, ts = self.get_latest_measurement()
if v is None:
time.sleep(0.05); continue
if v <= self.cutoff_voltage:
break
time.sleep(0.1)
self.test_completed.emit()
except Exception as e:
self.error_occurred.emit(str(e))
finally:
try: self._hi_z()
except: pass
self.finished.emit()
class ChargeWorker(QThread):
finished = pyqtSignal()
update_status = pyqtSignal(str)
test_completed = pyqtSignal()
error_occurred = pyqtSignal(str)
def __init__(self, device_manager, charge_current, cutoff_voltage, parent=None):
super().__init__(parent)
self.dev_manager = device_manager
self.device = device_manager.dev
self.charge_current = float(charge_current)
self.cutoff_voltage = float(cutoff_voltage)
self._running = True
self.voltage_timeout = 0.5
def stop(self):
self._running = False
def _set_direction_on_measurement_thread(self, direction):
try:
mt = getattr(self.dev_manager, 'measurement_thread', None)
if mt and hasattr(mt, 'set_direction'):
mt.set_direction(direction)
except Exception:
pass
def _set_simv_current(self, current):
try:
self.dev_manager.set_simv_current(current)
except DeviceDisconnectedError:
logger.error("Device disconnected during current set")
raise
def _hi_z(self):
try:
self.dev_manager.hi_z()
except DeviceDisconnectedError:
logger.error("Device disconnected during hi-z")
raise
def get_latest_measurement(self):
try:
q = self.dev_manager.measurement_thread.measurement_queue
item = q.get(timeout=self.voltage_timeout)
if isinstance(item, (tuple, list)):
if len(item) >= 3 and isinstance(item[0], str):
_, v, i, *rest = item; ts = rest[0] if rest else None; return v, i, ts
elif len(item) >= 2:
v = item[0]; i = item[1]; ts = item[2] if len(item)>=3 else None; return v, i, ts
return None, None, None
except Empty:
return None, None, None
except Exception:
return None, None, None
def run(self):
try:
self._set_direction_on_measurement_thread(1)
self._set_simv_current(abs(self.charge_current))
while self._running:
v, i, ts = self.get_latest_measurement()
if v is None:
time.sleep(0.05); continue
if v >= self.cutoff_voltage:
break
time.sleep(0.1)
self.test_completed.emit()
except Exception as e:
self.error_occurred.emit(str(e))
finally:
try: self._hi_z()
except: pass
self.finished.emit()
class BatteryTester(QMainWindow):
def __init__(self):
self.plot_mutex = threading.Lock()
super().__init__()
self.devices = {}
self.active_device = None
self.last_logged_phase = None
self.global_recording = False
self.debug_counter = 0
self.last_debug_time = time.time()
# Color scheme - MUST BE DEFINED FIRST
self.bg_color = "#2E3440"
self.fg_color = "#D8DEE9"
self.accent_color = "#5E81AC"
self.warning_color = "#BF616A"
self.success_color = "#A3BE8C"
# Status colors - MUST BE DEFINED BEFORE init_device()
self.status_colors = {
"connected": "green",
"disconnected": "red",
"error": "orange",
"active": self.accent_color,
"warning": self.warning_color
}
# Device and measurement state
self.session_active = False
self.measuring = False
self.test_running = False
self.continuous_mode = False
self.request_stop = False
self.interval = 0.1
self.log_dir = os.path.expanduser("~/adalm1000/logs")
os.makedirs(self.log_dir, exist_ok=True)
# Data buffers
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [],
'count': 0, 'last_plot_time': 0
}
self.phase_data = deque()
self.downsample_factor = 1 # Initial kein Downsampling
self.downsample_counter = 0
# Initialize all measurement variables
self.capacity_ah = 0.0
self.energy = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity = 1.0
self.c_rate = 0.1
self.charge_cutoff = 1.43
self.discharge_cutoff = 0.01
self.rest_time = 0.5
# Initialize UI and device
self.setup_ui()
self.init_device()
self.current_mode = "Live Monitoring"
self.change_mode(self.current_mode)
# Set window properties
self.setWindowTitle("ADALM1000 - Battery Tester (Multi-Mode)")
self.resize(1000, 800)
self.setMinimumSize(800, 700)
# Status update timer
self.status_timer = QTimer()
self.status_timer.timeout.connect(self.update_status_and_plot)
self.status_timer.start(1000) #every second
def print_device_status(self):
"""Debug method to print current device states"""
for serial, device in self.devices.items():
print(f"\nDevice: {serial}")
print(f"Active: {device == self.active_device}")
print(f"Start Time: {getattr(device, 'start_time', 'NOT SET')}")
print(f"Data Points: {len(device.time_data)}")
print(f"Last Voltage: {device.voltage_data[-1] if device.voltage_data else 'NONE'}")
print(f"Thread Running: {device.measurement_thread.isRunning() if hasattr(device, 'measurement_thread') else 'NO THREAD'}")
def setup_ui(self):
"""Configure the user interface with all elements properly organized"""
# Main widget and layout
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.main_layout = QVBoxLayout(self.central_widget)
self.main_layout.setContentsMargins(8, 8, 8, 8)
self.main_layout.setSpacing(8)
# Base style for consistent sizing
base_style = f"""
font-size: 10pt;
color: {self.fg_color};
"""
# Mode and device selection frame
mode_frame = QFrame()
mode_frame.setFrameShape(QFrame.StyledPanel)
mode_frame.setStyleSheet(f"""
QFrame {{
border: 1px solid {self.accent_color};
border-radius: 5px;
padding: 5px;
}}
QLabel {base_style}
""")
mode_layout = QHBoxLayout(mode_frame)
mode_layout.setContentsMargins(5, 2, 5, 2)
# Test mode selection
self.mode_label = QLabel("Test Mode:")
mode_layout.addWidget(self.mode_label)
self.mode_combo = QComboBox()
self.mode_combo.addItems(["Live Monitoring", "Discharge Test", "Charge Test", "Cycle Test"])
self.mode_combo.setStyleSheet(f"""
QComboBox {{
{base_style}
background-color: #3B4252;
border: 1px solid #4C566A;
border-radius: 3px;
padding: 2px;
min-height: 24px;
}}
""")
self.mode_combo.currentTextChanged.connect(self.change_mode)
mode_layout.addWidget(self.mode_combo, 1)
# Device selection
self.device_label = QLabel("Device:")
mode_layout.addWidget(self.device_label)
self.device_combo = QComboBox()
self.device_combo.setStyleSheet(f"""
QComboBox {{
{base_style}
background-color: #3B4252;
border: 1px solid #4C566A;
border-radius: 3px;
padding: 2px;
min-height: 24px;
}}
""")
self.device_combo.currentIndexChanged.connect(self.change_device)
mode_layout.addWidget(self.device_combo, 1)
self.main_layout.addWidget(mode_frame)
# Header area
header_frame = QFrame()
header_frame.setFrameShape(QFrame.NoFrame)
header_layout = QHBoxLayout(header_frame)
header_layout.setContentsMargins(0, 0, 0, 0)
self.title_label = QLabel("ADALM1000 Battery Tester")
self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};")
header_layout.addWidget(self.title_label, 1)
# Status indicator
self.status_light = QLabel()
self.status_light.setFixedSize(16, 16)
self.status_light.setStyleSheet("background-color: red; border-radius: 8px;")
header_layout.addWidget(self.status_light)
self.connection_label = QLabel("Disconnected")
self.connection_label.setStyleSheet(base_style)
header_layout.addWidget(self.connection_label)
# Reconnect button
self.reconnect_btn = QPushButton("Reconnect")
self.reconnect_btn.setStyleSheet(f"""
{base_style}
background-color: #4C566A;
padding: 2px 8px;
border-radius: 3px;
min-height: 24px;
""")
self.reconnect_btn.clicked.connect(self.reconnect_device)
header_layout.addWidget(self.reconnect_btn)
self.main_layout.addWidget(header_frame)
# Measurement display - 4 rows x 2 columns
display_frame = QFrame()
display_frame.setFrameShape(QFrame.StyledPanel)
display_frame.setStyleSheet(f"""
QFrame {{
border: 1px solid {self.accent_color};
border-radius: 5px;
padding: 3px;
}}
QLabel {base_style}
""")
display_layout = QGridLayout(display_frame)
display_layout.setHorizontalSpacing(5)
display_layout.setVerticalSpacing(2)
display_layout.setContentsMargins(5, 3, 5, 3)
# Measurement fields in exact order
measurement_fields = [
("Voltage", "V"), ("Current", "A"),
("Elapsed Time", "s"), ("Energy", "Wh"),
("Test Phase", None), ("Capacity", "Ah"), # None for no unit
("Cycle Count", None), ("Coulomb Eff.", "%")
]
for i, (label, unit) in enumerate(measurement_fields):
row = i // 2
col = (i % 2) * 2
# Container for each measurement with fixed height
container = QWidget()
container.setFixedHeight(24) # Fixed row height
container_layout = QHBoxLayout(container)
container_layout.setContentsMargins(2, 0, 2, 0)
container_layout.setSpacing(2) # Minimal spacing between elements
# Label (fixed width)
lbl = QLabel(f"{label}:")
lbl.setStyleSheet("min-width: 85px;")
container_layout.addWidget(lbl)
# Value field (fixed width)
value_text = "0.000" if unit else ("Idle" if label == "Test Phase" else "0")
value_lbl = QLabel(value_text)
value_lbl.setAlignment(Qt.AlignRight)
value_lbl.setStyleSheet("""
font-weight: bold;
min-width: 65px;
max-width: 65px;
""")
container_layout.addWidget(value_lbl)
# Unit (only if exists)
if unit:
unit_lbl = QLabel(unit)
unit_lbl.setStyleSheet("min-width: 20px;")
container_layout.addWidget(unit_lbl)
display_layout.addWidget(container, row, col)
# Assign references
widgets = [
display_layout.itemAtPosition(r, c).widget().layout().itemAt(1).widget()
for r in range(4) for c in [0, 2]
]
(self.voltage_label, self.current_label,
self.time_label, self.energy_label,
self.phase_label, self.capacity_label,
self.cycle_label, self.efficiency_label) = widgets
self.main_layout.addWidget(display_frame)
# Control area
controls_frame = QFrame()
controls_frame.setFrameShape(QFrame.NoFrame)
controls_layout = QHBoxLayout(controls_frame)
controls_layout.setContentsMargins(0, 0, 0, 0)
# Parameters frame
self.params_frame = QFrame()
self.params_frame.setFrameShape(QFrame.StyledPanel)
self.params_frame.setStyleSheet(f"""
QFrame {{
border: 1px solid {self.accent_color};
border-radius: 5px;
}}
QLabel {base_style}
QLineEdit {{
{base_style}
background-color: #3B4252;
border: 1px solid #4C566A;
border-radius: 3px;
padding: 2px;
min-height: 24px;
}}
""")
self.params_layout = QGridLayout(self.params_frame)
self.params_layout.setVerticalSpacing(3)
self.params_layout.setHorizontalSpacing(5)
self.params_layout.setContentsMargins(8, 5, 8, 5)
# Add parameter inputs
row = 0
# Battery Capacity
self.capacity_label_1 = QLabel("Capacity (Ah):")
self.params_layout.addWidget(self.capacity_label_1, row, 0)
self.capacity_input = QLineEdit("1.0")
self.capacity_input.setValidator(QDoubleValidator(0.001, 100, 3))
self.params_layout.addWidget(self.capacity_input, row, 1)
row += 1
# C-Rate
self.c_rate_label = QLabel("C-Rate:")
self.params_layout.addWidget(self.c_rate_label, row, 0)
self.c_rate_input = QLineEdit("0.1")
self.c_rate_input.setValidator(QDoubleValidator(0.01, 1, 2))
self.params_layout.addWidget(self.c_rate_input, row, 1)
row += 1
# Charge Cutoff Voltage
self.charge_cutoff_label = QLabel("Charge Cutoff (V):")
self.params_layout.addWidget(self.charge_cutoff_label, row, 0)
self.charge_cutoff_input = QLineEdit("1.43")
self.charge_cutoff_input.setValidator(QDoubleValidator(0.1, 5.0, 3))
self.params_layout.addWidget(self.charge_cutoff_input, row, 1)
row += 1
# Discharge Cutoff Voltage
self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):")
self.params_layout.addWidget(self.discharge_cutoff_label, row, 0)
self.discharge_cutoff_input = QLineEdit("0.01")
self.discharge_cutoff_input.setValidator(QDoubleValidator(0.1, 5.0, 3))
self.params_layout.addWidget(self.discharge_cutoff_input, row, 1)
row += 1
# Rest Time
self.rest_time_label = QLabel("Rest Time (h):")
self.params_layout.addWidget(self.rest_time_label, row, 0)
self.rest_time_input = QLineEdit("0.5")
self.rest_time_input.setValidator(QDoubleValidator(0.1, 24, 1))
self.params_layout.addWidget(self.rest_time_input, row, 1)
row += 1
# Test Conditions
self.test_conditions_label = QLabel("Test Conditions:")
self.params_layout.addWidget(self.test_conditions_label, row, 0)
self.test_conditions_input = QLineEdit("Room Temperature")
self.params_layout.addWidget(self.test_conditions_input, row, 1)
controls_layout.addWidget(self.params_frame, 1)
# Button frame
button_frame = QFrame()
button_frame.setFrameShape(QFrame.NoFrame)
button_layout = QVBoxLayout(button_frame)
button_layout.setContentsMargins(5, 0, 0, 0)
button_layout.setSpacing(5)
# Button style
button_style = f"""
QPushButton {{
{base_style}
font-weight: bold;
padding: 4px 8px;
border-radius: 4px;
min-height: 28px;
color: {self.fg_color};
}}
QPushButton:checked {{
background-color: {self.warning_color} !important;
color: {self.fg_color} !important;
}}
QPushButton:disabled {{
background-color: #4C566A;
}}
"""
# Single toggle button (Start/Stop)
self.toggle_button = QPushButton("START")
self.toggle_button.setCheckable(True)
self.toggle_button.setStyleSheet(button_style + f"""
background-color: {self.success_color};
min-width: 120px;
""")
self.toggle_button.clicked.connect(self.toggle_test)
button_layout.addWidget(self.toggle_button)
# Continuous mode checkbox (only for Cycle mode)
self.continuous_mode_check = QCheckBox("Continuous Mode")
self.continuous_mode_check.setChecked(True)
self.continuous_mode_check.setStyleSheet(base_style)
button_layout.addWidget(self.continuous_mode_check)
self.continuous_mode_check.hide()
# Record button for Live mode
self.record_button = QPushButton("● Start Recording")
self.record_button.setCheckable(True)
self.record_button.setStyleSheet(button_style.replace(
"background-color", "background-color", 1
) + f"background-color: {self.success_color};")
self.record_button.clicked.connect(self.toggle_global_recording)
button_layout.addWidget(self.record_button)
self.record_button.hide()
controls_layout.addWidget(button_frame)
self.main_layout.addWidget(controls_frame)
# Plot area
self.setup_plot()
# Status bar
self.status_bar = self.statusBar()
self.status_bar.setStyleSheet(f"color: {self.fg_color}; font-size: 9pt;")
self.status_bar.showMessage("Ready")
# Apply dark theme
self.setStyleSheet(f"""
QMainWindow {{
background-color: {self.bg_color};
}}
QWidget {{
{base_style}
}}
""")
def apply_button_style(self):
"""Apply consistent button styling based on current state"""
if self.toggle_button.isChecked():
# Stop state - red
self.toggle_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.warning_color};
color: white;
font-weight: bold;
border: none;
border-radius: 4px;
padding: 4px 8px;
min-height: 28px;
}}
QPushButton:hover {{
background-color: #{self.darker_color(self.warning_color)};
}}
""")
else:
# Start state - green
self.toggle_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.success_color};
color: {self.fg_color};
font-weight: bold;
border: none;
border-radius: 4px;
padding: 4px 8px;
min-height: 28px;
}}
QPushButton:hover {{
background-color: #{self.darker_color(self.success_color)};
}}
""")
# Update record button separately
if self.record_button.isChecked():
self.record_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.warning_color};
color: white;
font-weight: bold;
border: none;
border-radius: 4px;
padding: 4px 8px;
min-height: 28px;
}}
""")
else:
self.record_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.success_color};
color: {self.fg_color};
font-weight: bold;
border: none;
border-radius: 4px;
padding: 4px 8px;
min-height: 28px;
}}
""")
def darker_color(self, hex_color):
"""Helper to generate a darker shade for hover effects"""
if not hex_color.startswith('#'):
hex_color = '#' + hex_color
rgb = [int(hex_color[i:i+2], 16) for i in (1, 3, 5)]
darker = [max(0, c - 40) for c in rgb]
return ''.join([f"{c:02x}" for c in darker])
def toggle_global_recording(self):
"""Toggle recording for all connected devices simultaneously"""
self.global_recording = not self.global_recording
if self.global_recording:
# Start recording for all devices
for device in self.devices.values():
device.is_recording = True
if not device.measurement_thread.isRunning():
device.start_measurement(self.interval)
self.start_live_monitoring(device)
self.record_button.setText("■ Stop Recording")
self.status_bar.showMessage("Recording started for all connected devices")
else:
# Stop recording for all devices
for device in self.devices.values():
self.finalize_device_log_file(device)
device.is_recording = False
self.record_button.setText("● Start Recording")
self.status_bar.showMessage("Recording stopped for all devices")
self.apply_button_style()
def safe_execute(func):
"""Decorator to catch and log exceptions in Qt event handlers"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"Error in {func.__name__}: {str(e)}")
traceback.print_exc()
return wrapper
@safe_execute
def toggle_test(self, checked):
if checked:
self.start_test()
else:
self.stop_test()
self.apply_button_style()
def change_mode(self, mode_name):
"""Change between different test modes"""
self.current_mode = mode_name
self.stop_test() # Stop any current operation
# Show/hide mode-specific UI elements
show_charge = mode_name in ["Cycle Test", "Charge Test"]
show_discharge = mode_name in ["Cycle Test", "Discharge Test"]
show_rest = mode_name == "Cycle Test"
self.charge_cutoff_label.setVisible(show_charge)
self.charge_cutoff_input.setVisible(show_charge)
self.discharge_cutoff_label.setVisible(show_discharge)
self.discharge_cutoff_input.setVisible(show_discharge)
self.rest_time_label.setVisible(show_rest)
self.rest_time_input.setVisible(show_rest)
# Continuous mode checkbox only for cycle test
self.continuous_mode_check.setVisible(mode_name == "Cycle Test")
# Record button only for live monitoring
self.record_button.setVisible(mode_name == "Live Monitoring")
# Set button text based on mode and make sure it's visible for test modes
if mode_name == "Cycle Test":
self.toggle_button.setText("START CYCLE TEST")
self.toggle_button.show()
elif mode_name == "Discharge Test":
self.toggle_button.setText("START DISCHARGE")
self.toggle_button.show()
elif mode_name == "Charge Test":
self.toggle_button.setText("START CHARGE")
self.toggle_button.show()
elif mode_name == "Live Monitoring":
self.toggle_button.hide() # Only hide for Live Monitoring
# Reset button state
self.toggle_button.setChecked(False)
self.toggle_button.setEnabled(True)
self.apply_button_style()
# Reset measurement state and zero the time
if self.active_device:
dev = self.active_device
dev.reset_data()
# Reset the measurement thread's start time
if hasattr(dev, 'measurement_thread'):
dev.measurement_thread.start_time = time.time()
# Reset UI displays
self.capacity_label.setText("0.0000")
self.energy_label.setText("0.0000")
self.cycle_label.setText("0")
self.phase_label.setText("Idle")
#self.time_label.setText("00:00:00")
# Reset plot
self.reset_plot()
self.status_bar.showMessage(f"Mode changed to {mode_name}")
# Update recording button
if mode_name == "Live Monitoring":
self.record_button.setVisible(True)
if self.active_device and self.active_device.is_recording:
self.record_button.setChecked(True)
self.record_button.setText("Stop Recording")
else:
self.record_button.setChecked(False)
self.record_button.setText("Start Recording")
else:
self.record_button.setVisible(False)
self.apply_button_style()
self.status_bar.showMessage(f"Mode changed to {mode_name}")
def reset_test(self):
if not self.active_device:
return
dev_manager = self.active_device
dev_manager.reset_data() # Reset in DeviceManager
# Reset UI
self.capacity_label.setText("0.0000")
self.energy_label.setText("0.0000")
self.cycle_label.setText("0")
self.phase_label.setText("Idle")
def toggle_recording(self):
"""Toggle data recording in Live Monitoring mode"""
if not self.active_device:
return
dev = self.active_device
if not dev.is_recording: # Use device's recording state
try:
if self.create_cycle_log_file():
dev.is_recording = True # Set device's recording state
self.record_button.setText("Stop Recording")
self.apply_button_style() # Update style
self.status_bar.showMessage("Live recording started")
# Ensure monitoring is running
if not self.test_running:
self.start_live_monitoring(self.active_device)
else:
self.record_button.setChecked(False)
self.current_cycle_file = None
except Exception as e:
print(f"Error starting recording: {e}")
self.record_button.setChecked(False)
self.current_cycle_file = None
QMessageBox.critical(self, "Error", f"Failed to start recording:\n{str(e)}")
else:
# Stop recording
try:
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
self.finalize_log_file()
dev.is_recording = False # Clear device's recording state
self.record_button.setText("Start Recording")
self.apply_button_style() # Update style
self.status_bar.showMessage("Live recording stopped")
except Exception as e:
print(f"Error stopping recording: {e}")
def handle_continuous_mode_change(self, state):
"""Handle changes to continuous mode checkbox during operation"""
if not state and self.test_running: # If unchecked during test
self.status_bar.showMessage("Continuous mode disabled - will complete current cycle")
self.continuous_mode_check.setStyleSheet(f"color: {self.warning_color};")
QTimer.singleShot(2000, lambda: self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};"))
def setup_plot(self):
"""Configure the matplotlib plot"""
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color)
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
self.ax = self.fig.add_subplot(111)
self.ax.set_facecolor('#3B4252')
# Set initial voltage range
voltage_padding = 0.2
min_voltage = max(0, 0.9 - voltage_padding)
max_voltage = 1.43 + voltage_padding
self.ax.set_ylim(min_voltage, max_voltage)
# Voltage plot
self.line_voltage, = self.ax.plot([0], [0], color='#00BFFF', label='Voltage (V)', linewidth=2)
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
# Current plot (right axis)
self.ax2 = self.ax.twinx()
current_padding = 0.05
test_current = 0.1 * 0.2 # Default values
max_current = test_current * 1.5
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
self.line_current, = self.ax2.plot([0], [0], 'r-', label='Current (A)', linewidth=2)
self.ax2.set_ylabel("Current (A)", color='r')
self.ax2.tick_params(axis='y', labelcolor='r')
self.ax.set_xlabel('Time (s)', color=self.fg_color)
self.ax.set_title('Battery Test', color=self.fg_color)
self.ax.tick_params(axis='x', colors=self.fg_color)
self.ax.grid(True, color='#4C566A')
# Position legends
self.ax.legend(loc='upper left')
self.ax2.legend(loc='upper right')
# Embed plot
self.canvas = FigureCanvas(self.fig)
self.canvas.setStyleSheet(f"background-color: {self.bg_color};")
self.main_layout.addWidget(self.canvas, 1)
def init_device(self):
"""Robust device initialization"""
try:
# Close existing session
if hasattr(self, 'session'):
try:
self.session.end()
except:
pass
# Create new session
self.session = Session(ignore_dataflow=True, queue_size=10000)
self.session.scan()
# Retry mechanism
retry_count = 0
while not self.session.devices and retry_count < 3:
self.handle_device_connection(False, f"Scanning... (Attempt {retry_count+1}/3)")
QApplication.processEvents()
time.sleep(1)
self.session.scan()
retry_count += 1
if not self.session.devices:
self.handle_device_connection(False, "No devices found")
return
self.devices = {}
self.active_device = None
for dev in self.session.devices:
serial = dev.serial if hasattr(dev, 'serial') else str(dev[0])
logger.info(f"Found device: {serial}")
manager = DeviceManager(dev=dev, serial=serial)
self.devices[serial] = manager
# Connect signals for all devices
manager.measurement_thread.update_signal.connect(self.update_measurements)
manager.measurement_thread.error_signal.connect(self.handle_device_error)
# Select first device
first_serial = next(iter(self.devices.keys()))
self.active_device = self.devices[first_serial]
# Update UI
self.device_combo.clear()
for serial in self.devices:
self.device_combo.addItem(serial)
self.device_combo.setCurrentText(first_serial)
self.session_active = True
self.handle_device_connection(True, f"Connected: {first_serial}")
self.toggle_button.setEnabled(True)
# Connect measurement signals
self.measurement_thread = self.active_device.measurement_thread
self.measurement_thread.update_signal.connect(self.update_measurements)
self.measurement_thread.error_signal.connect(self.handle_device_error)
# Initialize recording state
self.global_recording = False
self.record_button.setChecked(False)
self.record_button.setText("Start Recording")
except Exception as e:
self.handle_device_connection(False, f"Initialization failed: {str(e)}")
def handle_no_devices(self):
"""Handle case when no devices are found"""
self.session_active = False
self.active_device = None
self.status_bar.showMessage("No ADALM1000 devices found")
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
self.toggle_button.setEnabled(False)
self.device_combo.clear()
# Show reconnect button
self.reconnect_btn.setEnabled(True)
self.reconnect_btn.setVisible(True)
def handle_device_connection(self, connected, message=None):
"""Update connection status with proper coloring"""
if connected:
status = "connected"
if not message:
message = "Connected"
else:
status = "error" if "fail" in message.lower() else "disconnected"
if not message:
message = "Disconnected"
color = self.status_colors.get(status, "orange")
self.connection_label.setText(message)
self.status_light.setStyleSheet(f"""
background-color: {color};
border-radius: 10px;
min-width: 12px;
max-width: 12px;
min-height: 12px;
max-height: 12px;
""")
QApplication.processEvents()
def check_connection(self):
"""Periodically verify device connection"""
if not hasattr(self, 'session') or not self.session.devices:
self.handle_device_error("Device disconnected")
self.reconnect_device()
def request_usb_permissions(self):
"""Handle USB permission issues with user interaction"""
msg = QMessageBox(self)
msg.setIcon(QMessageBox.Critical)
msg.setWindowTitle("USB Permission Required")
msg.setText("Permission needed to access ADALM1000 devices")
msg.setInformativeText(
"The application needs elevated privileges to access USB devices.\n\n"
"Please choose an option:"
)
# Add buttons
sudo_button = msg.addButton("Run as Administrator", QMessageBox.ActionRole)
udev_button = msg.addButton("Fix Permissions", QMessageBox.ActionRole)
cancel_button = msg.addButton(QMessageBox.Cancel)
msg.exec_()
if msg.clickedButton() == sudo_button:
# Restart with sudo
QMessageBox.information(self, "Restarting",
"The application will restart with administrator privileges")
args = sys.argv[:]
args.insert(0, sys.executable)
os.execvp("sudo", ["sudo"] + args)
elif msg.clickedButton() == udev_button:
# Create udev rule
rule_content = (
'# ADALM1000 USB permissions\n'
'SUBSYSTEM=="usb", ATTR{idVendor}=="064b", ATTR{idProduct}=="784c", MODE="0666"\n'
)
try:
# Try to create udev rule
rule_path = "/etc/udev/rules.d/52-adalm1000.rules"
with open(rule_path, "w") as f:
f.write(rule_content)
# Apply rules
os.system("sudo udevadm control --reload-rules")
os.system("sudo udevadm trigger")
QMessageBox.information(self, "Permissions Fixed",
"USB permissions configured. Please reconnect devices.")
except Exception as e:
QMessageBox.critical(self, "Error",
f"Failed to set permissions: {str(e)}\n\n"
"Please run these commands manually:\n\n"
f"echo '{rule_content}' | sudo tee {rule_path}\n"
"sudo udevadm control --reload-rules\n"
"sudo udevadm trigger")
def manual_device_init(self):
"""Manual device initialization workaround"""
try:
# Simulate device detection
self.device_combo.clear()
self.device_combo.addItem("ADALM1000-1 (Simulated)")
self.device_combo.addItem("ADALM1000-2 (Simulated)")
# Mock connection
self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;")
self.connection_label.setText("Simulated Devices")
self.session_active = True
self.toggle_button.setEnabled(True)
QMessageBox.warning(self, "Simulation Mode",
"Using simulated devices - real hardware not detected")
except Exception as e:
print(f"Manual init failed: {e}")
@safe_execute
def change_device(self, index):
"""Handle switching between connected devices"""
if not self.session_active or index < 0:
return
serial = self.device_combo.itemText(index)
if serial not in self.devices:
return
# Store current device state before switching
old_device = self.active_device
# Disconnect signals from old device's measurement thread
if old_device and old_device.measurement_thread:
try:
with self.plot_mutex: # Ensure thread-safe disconnection
old_device.measurement_thread.update_signal.disconnect(self.update_measurements)
old_device.measurement_thread.error_signal.disconnect(self.handle_device_error)
if old_device.is_recording:
self.finalize_device_log_file(old_device)
except (TypeError, RuntimeError):
# Signals weren't connected or already disconnected - safe to ignore
pass
# Activate new device
self.active_device = self.devices[serial]
dev = self.active_device
# Ensure measurement thread exists
if not hasattr(dev, 'measurement_thread'):
dev.measurement_thread = MeasurementThread(dev, self.interval, dev)
# Get reference to new thread
new_thread = dev.measurement_thread
# Connect signals for new device
with self.plot_mutex:
new_thread.update_signal.connect(self.update_measurements)
new_thread.error_signal.connect(self.handle_device_error)
# Start measurement if not running (with thread safety)
if not new_thread.isRunning():
new_thread.start()
# Initialize device time tracking if needed
if not hasattr(dev, 'start_time'):
dev.start_time = time.time()
# Update UI with current device data
self.update_ui_from_active_device()
# Handle recording state transition
if old_device and old_device.is_recording:
# Finalize old device recording
self.finalize_device_log_file(old_device)
# Set up recording for new device if global recording is enabled
if self.global_recording and not dev.is_recording:
self.start_live_monitoring(dev)
# Update recording button state
self.record_button.setChecked(dev.is_recording)
self.record_button.setText("■ Stop Recording" if dev.is_recording else "● Start Recording")
self.apply_button_style()
# Reset plot for new device
self.reset_plot()
# Update status
self.status_bar.showMessage(f"Switched to device: {serial}")
self.set_connection_status(f"Connected: {serial}", self.status_colors["connected"])
def update_ui_from_active_device(self):
dev = self.active_device
if not dev:
return
with self.plot_mutex:
# Kopiere aktuelle Daten
x = list(dev.display_time_data)
y_v = list(dev.display_voltage_data)
y_c = list(dev.display_current_data)
# Aktualisiere Plot
self.line_voltage.set_data(x, y_v)
self.line_current.set_data(x, y_c)
self.auto_scale_axes()
self.canvas.draw_idle()
# Aktualisiere Labels
if dev.voltage_data:
v = dev.voltage_data[-1]
i = dev.current_data[-1]
t = dev.time_data[-1] if dev.time_data else 0
self.voltage_label.setText(f"{v:.4f}")
self.current_label.setText(f"{abs(i):.4f}")
self.time_label.setText(self.format_time(t))
self.capacity_label.setText(f"{dev.capacity_ah:.4f}")
self.energy_label.setText(f"{dev.energy:.4f}")
self.cycle_label.setText(str(dev.cycle_count))
self.phase_label.setText(dev.test_phase)
@safe_execute
@pyqtSlot(str, float, float, float)
def update_measurements(self, serial, voltage, current, current_time):
"""Debugged measurement update handler"""
try:
device = self.devices.get(serial)
if not device:
logger.error(f"Device {serial} not found")
return
# Ensure timing is initialized
if not hasattr(device, 'start_time') or device.start_time is None:
device.start_time = current_time
logger.debug(f"Initialized start time for {serial}")
# Calculate elapsed time safely
try:
elapsed = current_time - device.start_time
except TypeError:
logger.error(f"Invalid timing - current: {current_time}, start: {device.start_time}")
device.start_time = current_time
elapsed = 0
# Validate measurements
if not (0 <= voltage <= 5.0) or not (-0.2 <= current <= 0.2):
logger.warning(f"Invalid values - V: {voltage:.3f}, I: {current:.3f}")
return
# Update data buffers
device.time_data.append(elapsed)
device.voltage_data.append(voltage)
device.current_data.append(current)
# Update display buffers
device.display_time_data.append(elapsed)
device.display_voltage_data.append(voltage)
device.display_current_data.append(current)
# Trim display buffers if needed
if len(device.display_time_data) > 1000:
device.display_time_data.popleft()
device.display_voltage_data.popleft()
device.display_current_data.popleft()
# Only update UI for active device
if device == self.active_device:
self.voltage_label.setText(f"{voltage:.4f}")
self.current_label.setText(f"{abs(current):.4f}")
self.time_label.setText(self.format_time(elapsed))
# Calculate metrics if we have enough data
if len(device.time_data) > 1:
delta_t = device.time_data[-1] - device.time_data[-2]
device.capacity_ah += abs(current) * delta_t / 3600
device.energy += (voltage * abs(current)) * delta_t / 3600
self.capacity_label.setText(f"{device.capacity_ah:.4f}")
self.energy_label.setText(f"{device.energy:.4f}")
# Force plot update
self.update_plot()
except Exception as e:
logger.error(f"Critical error in update_measurements: {str(e)}")
traceback.print_exc()
def adjust_downsampling(self):
current_length = len(self.time_data)
if current_length > self.max_points_to_keep * 1.5:
# Exponentiell erhöhen, aber max. 64
new_factor = min(64, max(1, self.downsample_factor * 2))
elif current_length < self.max_points_to_keep // 2:
# Halbieren, aber min. 1
new_factor = max(1, self.downsample_factor // 2)
else:
return
if new_factor != self.downsample_factor:
self.downsample_factor = new_factor
self.status_bar.showMessage(
f"Downsampling: Factor {self.downsample_factor}", 2000)
def update_status_and_plot(self):
"""Combined status and plot update"""
self.update_status()
self.update_plot()
def update_status(self):
"""Update status information periodically"""
now = time.time()
# Update all devices
for dev_manager in self.devices.values():
# Only process if device has data
if not dev_manager.time_data:
continue
# Update capacity calculation
if len(dev_manager.time_data) > 1:
idx = len(dev_manager.time_data) - 1
delta_t = dev_manager.time_data[idx] - dev_manager.time_data[idx-1]
current_val = abs(dev_manager.current_data[idx])
power = dev_manager.voltage_data[idx] * current_val
dev_manager.capacity_ah += current_val * delta_t / 3600
dev_manager.energy += power * delta_t / 3600
# Write to log if recording
if dev_manager.is_recording and dev_manager.log_writer:
try:
if now - getattr(dev_manager, '_last_log_time', 0) >= 1.0:
dev_manager.log_writer.writerow([
f"{dev_manager.time_data[-1]:.4f}",
f"{dev_manager.voltage_data[-1]:.6f}",
f"{dev_manager.current_data[-1]:.6f}",
"Live",
f"{dev_manager.capacity_ah:.4f}",
f"{power:.4f}",
f"{dev_manager.energy:.4f}"
])
dev_manager.log_file.flush()
dev_manager._last_log_time = now
except Exception as e:
print(f"Log write error for device {dev_manager.serial}: {e}")
dev_manager.is_recording = False
# Update UI for active device
if self.active_device and self.active_device.time_data:
dev = self.active_device
self.capacity_label.setText(f"{dev.capacity_ah:.4f}")
self.energy_label.setText(f"{dev.energy:.4f}")
# Update elapsed-time display
if self.active_device and self.active_device.time_data:
elapsed = self.active_device.time_data[-1]
self.time_label.setText(self.format_time(elapsed))
@safe_execute
def start_test(self):
"""Start the selected test mode using the active device"""
if not self.active_device:
QMessageBox.warning(self, "No Device", "No ADALM1000 device selected.")
self.toggle_button.setChecked(False)
self.apply_button_style()
return
dev_manager = self.active_device
dev = dev_manager.dev
# Clean up any previous test
self.cleanup_test_threads()
# Reset test state for active device
self.reset_test()
# Reset measurement thread timing
dev_manager.measurement_thread.start_time = time.time()
dev_manager.measurement_thread.voltage_window.clear()
dev_manager.measurement_thread.current_window.clear()
with dev_manager.measurement_thread.measurement_queue.mutex:
dev_manager.measurement_thread.measurement_queue.queue.clear()
self.time_label.setText("00:00:00")
# Reset data buffers
dev_manager.time_data.clear()
dev_manager.voltage_data.clear()
dev_manager.current_data.clear()
dev_manager.display_time_data.clear()
dev_manager.display_voltage_data.clear()
dev_manager.display_current_data.clear()
dev_manager.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0
}
# Reset device state
try:
dev.channels['A'].mode = pysmu.Mode.HI_Z
dev.channels['A'].constant(0)
dev.channels['B'].mode = pysmu.Mode.HI_Z
dev.channels['B'].constant(0)
except Exception as e:
QMessageBox.critical(self, "Device Error", f"Failed to reset device: {e}")
self.toggle_button.setChecked(False)
self.apply_button_style()
return
# Reset test variables
dev_manager.capacity_ah = 0.0
dev_manager.energy = 0.0
dev_manager.charge_capacity = 0.0
dev_manager.coulomb_efficiency = 0.0
dev_manager.cycle_count = 0
dev_manager.start_time = time.time()
dev_manager.test_phase = "Running"
# Set global state
self.test_running = True
self.request_stop = False
# Update UI
self.phase_label.setText(dev_manager.test_phase)
self.toggle_button.setText("STOP")
self.apply_button_style()
# Get parameters from UI
try:
self.capacity = float(self.capacity_input.text())
self.c_rate = float(self.c_rate_input.text())
test_current = self.c_rate * self.capacity
if test_current > 0.2:
raise ValueError("Current must be ≤ 200mA (0.2A) for ADALM1000")
except ValueError as e:
QMessageBox.critical(self, "Input Error", str(e))
self.stop_test()
return
# Create log file - using start_live_monitoring instead of _start_device_recording
if not self.start_live_monitoring(self.active_device):
self.stop_test()
return
# Start the appropriate test
if self.current_mode == "Cycle Test":
try:
self.charge_cutoff = float(self.charge_cutoff_input.text())
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
self.rest_time = float(self.rest_time_input.text())
if self.charge_cutoff <= self.discharge_cutoff:
raise ValueError("Charge cutoff must be higher than discharge cutoff")
# Start test sequence
self.test_sequence_thread = QThread()
self.test_sequence_worker = TestSequenceWorker(
self.active_device,
test_current,
self.charge_cutoff,
self.discharge_cutoff,
self.rest_time,
self.continuous_mode_check.isChecked(),
self
)
self.test_sequence_worker.moveToThread(self.test_sequence_thread)
self.test_sequence_worker.update_phase.connect(self.update_test_phase)
self.test_sequence_worker.update_status.connect(self.status_bar.showMessage)
self.test_sequence_worker.test_completed.connect(self.finalize_test)
self.test_sequence_worker.error_occurred.connect(self.handle_test_error)
self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit)
self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater)
self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater)
self.test_sequence_thread.start()
QTimer.singleShot(0, self.test_sequence_worker.run)
self.status_bar.showMessage(f"Cycle test started | Device: {dev.serial} | Current: {test_current:.4f}A")
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
self.stop_test()
elif self.current_mode == "Discharge Test":
try:
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
self.discharge_thread = QThread()
self.discharge_worker = DischargeWorker(self.active_device, test_current, self.discharge_cutoff, self)
self.discharge_worker.moveToThread(self.discharge_thread)
self.discharge_worker.update_status.connect(self.status_bar.showMessage)
self.discharge_worker.test_completed.connect(self.finalize_test)
self.discharge_worker.error_occurred.connect(self.handle_test_error)
self.discharge_worker.finished.connect(self.discharge_thread.quit)
self.discharge_worker.finished.connect(self.discharge_worker.deleteLater)
self.discharge_thread.finished.connect(self.discharge_thread.deleteLater)
self.discharge_thread.start()
QTimer.singleShot(0, self.discharge_worker.run)
self.status_bar.showMessage(f"Discharge test started | Device: {dev.serial} | Current: {test_current:.4f}A")
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
self.stop_test()
elif self.current_mode == "Charge Test":
try:
self.charge_cutoff = float(self.charge_cutoff_input.text())
self.charge_thread = QThread()
self.charge_worker = ChargeWorker(self.active_device, test_current, self.charge_cutoff, self)
self.charge_worker.moveToThread(self.charge_thread)
self.charge_worker.update_status.connect(self.status_bar.showMessage)
self.charge_worker.test_completed.connect(self.finalize_test)
self.charge_worker.error_occurred.connect(self.handle_test_error)
self.charge_worker.finished.connect(self.charge_thread.quit)
self.charge_worker.finished.connect(self.charge_worker.deleteLater)
self.charge_thread.finished.connect(self.charge_thread.deleteLater)
self.charge_thread.start()
QTimer.singleShot(0, self.charge_worker.run)
self.status_bar.showMessage(f"Charge test started | Device: {dev.serial} | Current: {test_current:.4f}A")
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
self.stop_test()
elif self.current_mode == "Live Monitoring":
# Ensure we pass the device parameter
self.start_live_monitoring(self.active_device)
def start_cycle_test(self):
"""Start the battery cycle test"""
# Clean up any previous test
if hasattr(self, 'test_sequence_worker'):
try:
self.test_sequence_worker.stop()
except:
pass
self.test_sequence_worker.deleteLater()
if hasattr(self, 'test_sequence_thread'):
self.test_sequence_thread.quit()
self.test_sequence_thread.wait()
self.test_sequence_thread.deleteLater()
del self.test_sequence_thread
self.reset_test()
self.reset_plot()
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
# Reset stop flag
self.request_stop = False
if not self.test_running:
try:
# Get parameters from UI
self.capacity = float(self.capacity_input.text())
self.charge_cutoff = float(self.charge_cutoff_input.text())
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
self.rest_time = float(self.rest_time_input.text())
self.c_rate = float(self.c_rate_input.text())
# Validate inputs
if self.capacity <= 0:
raise ValueError("Battery capacity must be positive")
if self.charge_cutoff <= self.discharge_cutoff:
raise ValueError("Charge cutoff must be higher than discharge cutoff")
if self.c_rate <= 0:
raise ValueError("C-rate must be positive")
test_current = self.c_rate * self.capacity
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear ALL previous data completely
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
self.phase_data.clear()
# Reset capacities and timing
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity_ah = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
self.energy = 0.0
# Reset measurement thread's timer and queues
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
self.measurement_thread.voltage_window.clear()
self.measurement_thread.current_window.clear()
with self.measurement_thread.measurement_queue.mutex:
self.measurement_thread.measurement_queue.queue.clear()
# Reset plot completely
self.reset_plot()
# Start test
self.test_running = True
self.start_time = time.time()
self.last_update_time = time.time()
self.test_phase = "Initial Discharge"
self.phase_label.setText(self.test_phase)
self.toggle_button.setChecked(True)
self.toggle_button.setText("STOP")
self.apply_button_style()
self.status_bar.showMessage(f"Cycle test started | Current: {test_current:.4f}A")
# Create log file
self.create_cycle_log_file()
# Start test sequence in a QThread
self.test_sequence_thread = QThread()
self.test_sequence_worker = TestSequenceWorker(
self.active_device.dev,
test_current,
self.charge_cutoff,
self.discharge_cutoff,
self.rest_time,
self.continuous_mode_check.isChecked(),
self # Pass reference to main window for callbacks
)
self.test_sequence_worker.moveToThread(self.test_sequence_thread)
# Connect signals
self.test_sequence_worker.update_phase.connect(self.update_test_phase)
self.test_sequence_worker.update_status.connect(self.status_bar.showMessage)
self.test_sequence_worker.test_completed.connect(self.finalize_test)
self.test_sequence_worker.error_occurred.connect(self.handle_test_error)
self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit)
self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater)
self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater)
# Start the thread and the worker's run method
self.test_sequence_thread.start()
QTimer.singleShot(0, self.test_sequence_worker.run)
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
# Ensure buttons are in correct state if error occurs
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.apply_button_style()
self.toggle_button.setEnabled(True)
def start_discharge_test(self):
"""Start the battery discharge test"""
# Clean up any previous test
self.reset_test() # löscht time_data, voltage_data, current_data, display_*, phase_data
self.reset_plot()
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
if hasattr(self, 'discharge_worker'):
try:
self.discharge_worker.stop()
except:
pass
self.discharge_worker.deleteLater()
if hasattr(self, 'discharge_thread'):
self.discharge_thread.quit()
self.discharge_thread.wait() # warte unbegrenzt, bis er wirklich fertig ist
self.discharge_thread.deleteLater()
del self.discharge_thread
# Reset stop flag
self.request_stop = False
if not self.test_running:
try:
# Get parameters from UI
self.capacity = float(self.capacity_input.text())
self.discharge_cutoff = float(self.discharge_cutoff_input.text())
self.c_rate = float(self.c_rate_input.text())
# Validate inputs
if self.capacity <= 0:
raise ValueError("Battery capacity must be positive")
if self.c_rate <= 0:
raise ValueError("C-rate must be positive")
test_current = self.c_rate * self.capacity
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear ALL previous data completely
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
# Reset capacities and timing
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity_ah = 0.0
self.energy = 0.0
self.cycle_count = 1
# Reset measurement thread's timer and queues
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
self.measurement_thread.voltage_window.clear()
self.measurement_thread.current_window.clear()
with self.measurement_thread.measurement_queue.mutex:
self.measurement_thread.measurement_queue.queue.clear()
# Reset plot completely
self.reset_plot()
# Start test
self.test_running = True
self.start_time = time.time()
self.last_update_time = time.time()
self.test_phase = "Discharge"
self.phase_label.setText(self.test_phase)
self.toggle_button.setChecked(True)
self.toggle_button.setText("STOP")
self.apply_button_style()
self.status_bar.showMessage(f"Discharge started | Current: {test_current:.4f}A")
# Create log file
self.create_cycle_log_file()
# Start discharge worker in a QThread
self.discharge_thread = QThread()
self.discharge_worker = DischargeWorker(
self.active_device.dev,
test_current,
self.discharge_cutoff,
self # Pass reference to main window for callbacks
)
self.discharge_worker.moveToThread(self.discharge_thread)
# Connect signals
self.discharge_worker.update_status.connect(self.status_bar.showMessage)
self.discharge_worker.test_completed.connect(self.finalize_test)
self.discharge_worker.error_occurred.connect(self.handle_test_error)
self.discharge_worker.finished.connect(self.discharge_thread.quit)
self.discharge_worker.finished.connect(self.discharge_worker.deleteLater)
self.discharge_thread.finished.connect(self.discharge_thread.deleteLater)
# Start the thread and the worker's run method
self.discharge_thread.start()
QTimer.singleShot(0, self.discharge_worker.run)
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
# Ensure buttons are in correct state if error occurs
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.apply_button_style()
self.toggle_button.setEnabled(True)
def start_charge_test(self):
"""Start the battery charge test"""
# Clean up any previous test
if hasattr(self, 'charge_worker'):
try:
self.charge_worker.stop()
except:
pass
self.charge_worker.deleteLater()
if hasattr(self, 'charge_thread'):
self.charge_thread.quit()
self.charge_thread.wait()
self.charge_thread.deleteLater()
del self.charge_thread
self.reset_test()
self.reset_plot()
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
# Reset stop flag
self.request_stop = False
if not self.test_running:
try:
# Get parameters from UI
self.capacity = float(self.capacity_input.text())
self.charge_cutoff = float(self.charge_cutoff_input.text())
self.c_rate = float(self.c_rate_input.text())
# Validate inputs
if self.capacity <= 0:
raise ValueError("Battery capacity must be positive")
if self.c_rate <= 0:
raise ValueError("C-rate must be positive")
test_current = self.c_rate * self.capacity
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear ALL previous data completely
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
# Reset capacities and timing
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity_ah = 0.0
self.energy = 0.0
self.cycle_count = 1
# Reset measurement thread
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
self.measurement_thread.voltage_window.clear()
self.measurement_thread.current_window.clear()
with self.measurement_thread.measurement_queue.mutex:
self.measurement_thread.measurement_queue.queue.clear()
# Reset plot
self.reset_plot()
# Start test
self.test_running = True
self.start_time = time.time()
self.last_update_time = time.time()
self.test_phase = "Charge"
self.phase_label.setText(self.test_phase)
self.toggle_button.setChecked(True)
self.toggle_button.setText("STOP")
self.apply_button_style()
self.status_bar.showMessage(f"Charge started @ {test_current:.3f}A to {self.charge_cutoff}V")
# Create log file
self.create_cycle_log_file()
# Start charge worker in a QThread
self.charge_thread = QThread()
self.charge_worker = ChargeWorker(
self.active_device.dev,
test_current,
self.charge_cutoff,
self
)
self.charge_worker.moveToThread(self.charge_thread)
# Connect signals
self.charge_worker.update_status.connect(self.status_bar.showMessage)
self.charge_worker.test_completed.connect(self.finalize_test)
self.charge_worker.error_occurred.connect(self.handle_test_error)
self.charge_worker.finished.connect(self.charge_thread.quit)
self.charge_worker.finished.connect(self.charge_worker.deleteLater)
self.charge_thread.finished.connect(self.charge_thread.deleteLater)
# Start the thread
self.charge_thread.start()
QTimer.singleShot(0, self.charge_worker.run)
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.apply_button_style()
self.toggle_button.setEnabled(True)
def start_live_monitoring(self, device):
"""Initialize logging for a specific device (replaces create_device_log_file)"""
try:
if device.is_recording:
return True # Already recording
# Ensure log directory exists
os.makedirs(device.log_dir, exist_ok=True)
if not os.access(device.log_dir, os.W_OK):
QMessageBox.critical(self, "Error",
f"No write permissions in {device.log_dir}")
return False
# Generate filename with device serial and timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
clean_serial = device.serial.replace(":", "_")[-8:] # Clean for filename
filename = os.path.join(
device.log_dir,
f"battery_test_{clean_serial}_{timestamp}.csv"
)
# Open file and initialize writer
device.log_file = open(filename, 'w', newline='')
device.log_writer = csv.writer(device.log_file)
# Write comprehensive header
device.log_file.write(f"# ADALM1000 Battery Test Log\n")
device.log_file.write(f"# Device Serial: {device.serial}\n")
device.log_file.write(f"# Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
device.log_file.write(f"# Test Mode: {self.current_mode}\n")
if hasattr(self, 'capacity_input'):
device.log_file.write(f"# Battery Capacity: {self.capacity_input.text()} Ah\n")
# Mode-specific parameters
if self.current_mode == "Cycle Test":
device.log_file.write(f"# Charge Cutoff: {self.charge_cutoff_input.text()} V\n")
device.log_file.write(f"# Discharge Cutoff: {self.discharge_cutoff_input.text()} V\n")
device.log_file.write(f"# Rest Time: {self.rest_time_input.text()} h\n")
device.log_file.write("#\n") # End of header
# Write column headers
if self.current_mode == "Cycle Test":
device.log_writer.writerow([
"Time(s)", "Voltage(V)", "Current(A)", "Phase",
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
"Coulomb_Eff(%)", "Cycle"
])
else:
device.log_writer.writerow([
"Time(s)", "Voltage(V)", "Current(A)", "Phase",
"Capacity(Ah)", "Power(W)", "Energy(Wh)"
])
device.is_recording = True
device._last_log_time = 0
return True
except Exception as e:
error_msg = f"Failed to start recording for {device.serial}:\n{str(e)}"
print(error_msg)
QMessageBox.critical(self, "Recording Error", error_msg)
if device.log_file:
try:
device.log_file.close()
except:
pass
device.log_file = None
device.log_writer = None
return False
def finalize_device_log_file(self, device):
"""Properly finalize and close a device's log file"""
if not device.is_recording or not device.log_file:
return
try:
# Write test summary footer
device.log_file.write("\n# TEST SUMMARY\n")
device.log_file.write(f"# End Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
if device.time_data:
duration = device.time_data[-1] if device.time_data else 0
device.log_file.write(f"# Duration: {self.format_time(duration)}\n")
device.log_file.write(f"# Final Voltage: {device.voltage_data[-1]:.4f} V\n")
device.log_file.write(f"# Final Current: {device.current_data[-1]:.4f} A\n")
device.log_file.write(f"# Total Capacity: {device.capacity_ah:.6f} Ah\n")
device.log_file.write(f"# Total Energy: {device.energy:.6f} Wh\n")
if self.current_mode == "Cycle Test":
device.log_file.write(f"# Cycles Completed: {device.cycle_count}\n")
if device.charge_capacity > 0:
device.log_file.write(f"# Coulomb Efficiency: {device.coulomb_efficiency:.2f}%\n")
except Exception as e:
print(f"Error writing footer for {device.serial}: {str(e)}")
finally:
try:
device.log_file.close()
except:
pass
device.log_file = None
device.log_writer = None
device.is_recording = False
def format_time(self, seconds):
"""Convert seconds to hh:mm:ss format"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
@safe_execute
def stop_test(self):
"""Request immediate stop with proper visual feedback"""
# Immediate red button feedback
self.toggle_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.status_colors["warning"]};
color: white;
font-weight: bold;
}}
""")
QApplication.processEvents()
if not self.test_running:
self.reset_button_state()
return
try:
# Stop operations
self.request_stop = True
self.test_running = False
self.measuring = False
# Stop workers
workers = ['test_sequence_worker', 'discharge_worker', 'charge_worker']
for worker in workers:
if hasattr(self, worker):
getattr(self, worker).stop()
# Reset device
if self.active_device:
self.active_device.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.active_device.dev.channels['A'].constant(0)
self.handle_device_connection(True) # Confirm connection
except Exception as e:
self.handle_device_connection(False, e)
finally:
# Clean up
self.reset_test_data()
self.reset_plot()
self.reset_button_state()
if hasattr(self, 'current_cycle_file'):
self.finalize_log_file()
self.status_bar.showMessage("Test stopped")
def set_connection_status(self, text, color=None):
"""Update connection status with optional color"""
if color is None:
if "error" in text.lower():
color = self.status_colors["error"]
elif "disconnect" in text.lower():
color = self.status_colors["disconnected"]
else:
color = self.status_colors["connected"]
self.connection_label.setText(text)
self.status_light.setStyleSheet(f"""
background-color: {color};
border-radius: 10px;
min-width: 12px;
max-width: 12px;
min-height: 12px;
max-height: 12px;
""")
QApplication.processEvents()
def reset_button_state(self):
"""Reset button to appropriate default state"""
mode = getattr(self, 'current_mode', 'Live Monitoring')
text = {
'Cycle Test': "START CYCLE TEST",
'Discharge Test': "START DISCHARGE",
'Charge Test': "START CHARGE"
}.get(mode, "START")
self.toggle_button.setChecked(False)
self.toggle_button.setText(text)
self.apply_button_style()
self.toggle_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.success_color};
color: {self.fg_color};
font-weight: bold;
}}
QPushButton:checked {{
background-color: {self.warning_color};
}}
QPushButton:disabled {{
background-color: #4C566A;
}}
""")
def reset_button_style(self):
"""Reset button to default style"""
mode = getattr(self, 'current_mode', 'Live Monitoring')
text = {
'Cycle Test': "START CYCLE TEST",
'Discharge Test': "START DISCHARGE",
'Charge Test': "START CHARGE",
}.get(mode, "START")
self.toggle_button.setText(text)
self.toggle_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.success_color};
color: {self.fg_color};
font-weight: bold;
}}
QPushButton:checked {{
background-color: {self.warning_color};
}}
""")
def closeEvent(self, event):
"""Ensure clean shutdown"""
self.stop_test()
# Stop all timers
self.status_timer.stop()
# Close all log files
for device in self.devices.values():
if device.is_recording:
self.finalize_device_log_file(device)
# Clean up threads
if hasattr(self, 'measurement_thread') and self.measurement_thread:
self.measurement_thread.stop()
self.measurement_thread.wait(1000)
event.accept()
def reset_test_data(self):
if not self.active_device:
return
dev = self.active_device
# Clear all data buffers
dev.time_data.clear()
dev.voltage_data.clear()
dev.current_data.clear()
# Display-Daten ebenfalls zurücksetzen
dev.display_time_data.clear()
dev.display_voltage_data.clear()
dev.display_current_data.clear()
# Reset statistics
dev.capacity_ah = 0.0
dev.energy = 0.0
dev.charge_capacity = 0.0
dev.coulomb_efficiency = 0.0
dev.test_phase = "Idle"
# Reset UI displays
self.voltage_label.setText("0.000")
self.current_label.setText("0.000")
#self.time_label.setText("00:00:00")
self.capacity_label.setText("0.0000")
self.energy_label.setText("0.0000")
self.phase_label.setText("Idle")
def finalize_test(self):
"""Final cleanup after test completes or is stopped"""
try:
# 1. Stop any active measurement or test operations
self.measuring = False
self.test_running = False
# 2. Reset device to safe state
try:
self.active_device.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.active_device.dev.channels['A'].constant(0)
self.active_device.dev.channels['B'].mode = pysmu.Mode.HI_Z
self.active_device.dev.channels['B'].constant(0)
except Exception as e:
print(f"Error resetting device in finalize: {e}")
# 3. Clean up test sequence thread safely
if hasattr(self, 'test_sequence_thread'):
try:
if self.test_sequence_thread.isRunning():
if hasattr(self, 'test_sequence_worker'):
try:
self.test_sequence_worker.stop()
except RuntimeError:
pass
self.test_sequence_thread.quit()
self.test_sequence_thread.wait(500)
except Exception as e:
print(f"Error stopping test sequence thread: {e}")
finally:
if hasattr(self, 'test_sequence_worker'):
try:
if not sip.isdeleted(self.test_sequence_worker):
self.test_sequence_worker.deleteLater()
except:
pass
if hasattr(self, 'test_sequence_thread'):
try:
if not sip.isdeleted(self.test_sequence_thread):
self.test_sequence_thread.deleteLater()
except:
pass
finally:
if hasattr(self, 'test_sequence_thread'):
del self.test_sequence_thread
# 4. Clean up discharge thread safely
if hasattr(self, 'discharge_thread'):
try:
if self.discharge_thread.isRunning():
if hasattr(self, 'discharge_worker'):
try:
self.discharge_worker.stop()
except RuntimeError:
pass
self.discharge_thread.quit()
self.discharge_thread.wait(500)
except Exception as e:
print(f"Error stopping discharge thread: {e}")
finally:
if hasattr(self, 'discharge_worker'):
try:
if not sip.isdeleted(self.discharge_worker):
self.discharge_worker.deleteLater()
except:
pass
if hasattr(self, 'discharge_thread'):
try:
if not sip.isdeleted(self.discharge_thread):
self.discharge_thread.deleteLater()
except:
pass
finally:
if hasattr(self, 'discharge_thread'):
del self.discharge_thread
# 5. Clean up charge thread safely (using same pattern as discharge thread)
if hasattr(self, 'charge_thread'):
try:
if self.charge_thread.isRunning():
if hasattr(self, 'charge_worker'):
try:
self.charge_worker.stop()
except RuntimeError:
pass
self.charge_thread.quit()
self.charge_thread.wait(500)
except Exception as e:
print(f"Error stopping charge thread: {e}")
finally:
if hasattr(self, 'charge_worker'):
try:
if not sip.isdeleted(self.charge_worker):
self.charge_worker.deleteLater()
except:
pass
if hasattr(self, 'charge_thread'):
try:
if not sip.isdeleted(self.charge_thread):
self.charge_thread.deleteLater()
except:
pass
finally:
if hasattr(self, 'charge_thread'):
del self.charge_thread
# 6. Finalize log file
self.finalize_log_file()
# 7. Reset UI and state
self.request_stop = False
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.apply_button_style()
self.toggle_button.setEnabled(True)
self.apply_button_style() # Add this line
self.test_running = False
# 8. Show completion message if test wasn't stopped by user
if not self.request_stop:
test_current = self.c_rate * self.capacity
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
if self.current_mode == "Cycle Test":
message = (
f"Cycle test completed | "
f"Cycle {self.cycle_count} | "
f"Capacity: {self.capacity_ah:.4f}Ah | "
f"Efficiency: {self.coulomb_efficiency:.1f}%"
)
QMessageBox.information(
self,
"Test Completed",
f"Cycle test completed successfully.\n\n"
f"Test Parameters:\n"
f"- Capacity: {self.capacity} Ah\n"
f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n"
f"- Charge Cutoff: {self.charge_cutoff} V\n"
f"- Discharge Cutoff: {self.discharge_cutoff} V\n"
f"- Conditions: {test_conditions}\n\n"
f"Results:\n"
f"- Cycles: {self.cycle_count}\n"
f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n"
f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%"
)
elif self.current_mode == "Discharge Test":
message = (
f"Discharge completed | "
f"Capacity: {self.capacity_ah:.4f}Ah | "
f"Energy: {self.energy:.4f}Wh"
)
QMessageBox.information(
self,
"Discharge Completed",
f"Discharge test completed successfully.\n\n"
f"Test Parameters:\n"
f"- Capacity: {self.capacity} Ah\n"
f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n"
f"- Discharge Cutoff: {self.discharge_cutoff} V\n"
f"- Conditions: {test_conditions}\n\n"
f"Results:\n"
f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n"
f"- Energy delivered: {self.energy:.4f}Wh"
)
self.status_bar.showMessage(message)
except Exception as e:
print(f"Error in finalize_test: {e}")
import traceback
traceback.print_exc()
# Ensure we don't leave the UI in a locked state
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.apply_button_style()
self.toggle_button.setEnabled(True)
self.status_bar.showMessage("Error during test finalization")
def reset_plot(self):
"""Completely reset the plot to initial state"""
# Clear plot data
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
# Reset axes
self.ax.set_xlim(0, 10)
self.ax.set_ylim(0, 5.0) # Full voltage range
self.ax2.set_ylim(-0.25, 0.25) # Full current range
# Redraw with slight delay to ensure UI updates
QTimer.singleShot(50, self.canvas.draw_idle)
def update_status_and_plot(self):
"""Combined status and plot update"""
self.update_status()
self.update_plot()
def update_plot(self):
"""Debugged plot update"""
if not self.active_device:
logger.warning("No active device in update_plot")
return
try:
dev = self.active_device
with self.plot_mutex:
if not dev.display_time_data:
logger.warning(f"No data to plot for {dev.serial}")
return
x_data = list(dev.display_time_data)
y1_data = list(dev.display_voltage_data)
y2_data = list(dev.display_current_data)
self.line_voltage.set_data(x_data, y1_data)
self.line_current.set_data(x_data, y2_data)
# Auto-scale only when significant changes occur
if len(x_data) > 1 and x_data[-1] - x_data[0] > 1.0:
self.auto_scale_axes()
self.canvas.draw_idle()
logger.debug(f"Plot updated for {dev.serial} with {len(x_data)} points")
except Exception as e:
logger.error(f"Plot update failed: {str(e)}")
def auto_scale_axes(self):
"""Auto-scale plot axes with appropriate padding and strict boundaries"""
if not self.active_device or not self.active_device.time_data:
return
dev = self.active_device
min_time = 0
max_time = dev.time_data[-1]
current_xlim = self.ax.get_xlim()
if max_time > current_xlim[1] * 0.95:
new_max = max_time * 1.05
self.ax.set_xlim(min_time, new_max)
self.ax2.set_xlim(min_time, new_max)
voltage_padding = 0.2
if dev.voltage_data:
min_voltage = max(0, min(dev.voltage_data) - voltage_padding)
max_voltage = min(5.0, max(dev.voltage_data) + voltage_padding)
current_ylim = self.ax.get_ylim()
if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1):
self.ax.set_ylim(min_voltage, max_voltage)
current_padding = 0.05
if dev.current_data:
min_current = max(-0.25, min(dev.current_data) - current_padding)
max_current = min(0.25, max(dev.current_data) + current_padding)
current_ylim2 = self.ax2.get_ylim()
if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02):
self.ax2.set_ylim(min_current, max_current)
@pyqtSlot(str)
def handle_device_error(self, error_msg):
"""Handle device errors with proper connection status"""
logger.error(f"Device error: {error_msg}")
self.set_connection_status(f"Error: {error_msg}", "orange")
self.reconnect_btn.setVisible(True)
self.reconnect_btn.setEnabled(True)
self.toggle_button.setEnabled(False)
# Attempt automatic recovery
QTimer.singleShot(2000, self.reconnect_device)
def validate_measurements(self, voltage, current):
"""Filter out invalid measurements"""
# Fix negative values caused by connection issues
if voltage < 0 or not (0 <= voltage <= 5.0):
return False
if abs(current) > 0.3: # Beyond ADALM1000's ±200mA range
return False
return True
@pyqtSlot(str)
def update_test_phase(self, phase_text):
"""Update the test phase display"""
self.test_phase = phase_text
self.phase_label.setText(phase_text)
@pyqtSlot(str)
def handle_test_error(self, error_msg):
"""Handle errors from the test sequence with complete cleanup"""
try:
# 1. Notify user
QMessageBox.critical(self, "Test Error",
f"An error occurred:\n{error_msg}\n\nAttempting to recover...")
# 2. Stop all operations
self.stop_test()
# 3. Reset UI elements
if hasattr(self, 'line_voltage'):
try:
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
self.ax.set_xlim(0, 1)
self.ax2.set_xlim(0, 1)
self.canvas.draw()
except Exception as plot_error:
print(f"Plot reset error: {plot_error}")
# 4. Update status
self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...")
self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;")
# 5. Attempt recovery
QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect
except Exception as e:
print(f"Error in error handler: {e}")
# Fallback - restart application?
QMessageBox.critical(self, "Fatal Error",
"The application needs to restart due to an unrecoverable error")
QTimer.singleShot(1000, self.close)
def attempt_reconnect(self):
"""Attempt to reconnect automatically"""
QMessageBox.critical(
self,
"Device Connection Error",
"Could not connect to ADALM1000\n\n"
"1. Check USB cable connection\n"
"2. The device will attempt to reconnect automatically"
)
QTimer.singleShot(1000, self.reconnect_device)
def cleanup_test_threads(self):
"""Clean up any existing test threads before starting a new test"""
# Stop and clean up test sequence thread if it exists
if hasattr(self, 'test_sequence_thread'):
try:
if hasattr(self, 'test_sequence_worker'):
self.test_sequence_worker.stop()
self.test_sequence_thread.quit()
self.test_sequence_thread.wait(500)
except Exception as e:
print(f"Error cleaning up test sequence thread: {e}")
finally:
if hasattr(self, 'test_sequence_worker'):
try:
self.test_sequence_worker.deleteLater()
except:
pass
if hasattr(self, 'test_sequence_thread'):
try:
self.test_sequence_thread.deleteLater()
except:
pass
# Stop and clean up discharge thread if it exists
if hasattr(self, 'discharge_thread'):
try:
if hasattr(self, 'discharge_worker'):
self.discharge_worker.stop()
self.discharge_thread.quit()
self.discharge_thread.wait(500)
except Exception as e:
print(f"Error cleaning up discharge thread: {e}")
finally:
if hasattr(self, 'discharge_worker'):
try:
self.discharge_worker.deleteLater()
except:
pass
if hasattr(self, 'discharge_thread'):
try:
self.discharge_thread.deleteLater()
except:
pass
# Stop and clean up charge thread if it exists
if hasattr(self, 'charge_thread'):
try:
if hasattr(self, 'charge_worker'):
self.charge_worker.stop()
self.charge_thread.quit()
self.charge_thread.wait(500)
except Exception as e:
print(f"Error cleaning up charge thread: {e}")
finally:
if hasattr(self, 'charge_worker'):
try:
self.charge_worker.deleteLater()
except:
pass
if hasattr(self, 'charge_thread'):
try:
self.charge_thread.deleteLater()
except:
pass
def reconnect_device(self):
"""Comprehensive device reconnection handler"""
try:
self.handle_device_connection(False, "Reconnecting...")
# 1. Clean up existing connections
if hasattr(self, 'measurement_thread') and self.measurement_thread:
try:
self.measurement_thread.stop()
if not self.measurement_thread.wait(500):
self.measurement_thread.terminate()
except Exception as e:
print(f"Error stopping measurement thread: {e}")
if hasattr(self, 'session') and self.session:
try:
self.session.end()
except Exception as e:
print(f"Error ending session: {e}")
# 2. Initialize new session
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
self.session.scan()
if not self.session.devices:
self.handle_device_connection(False, "No devices detected")
return
self.session.start(0)
# 3. Re-establish device connection
if hasattr(self, 'active_device') and self.active_device:
# Try to find the same device by serial
target_serial = self.active_device.serial
for dev in self.session.devices:
if dev.serial == target_serial:
# Recreate the DeviceManager
self.active_device = DeviceManager(dev)
self.devices[target_serial] = self.active_device
break
else:
# No previous device, just use first available
dev = self.session.devices[0]
self.active_device = DeviceManager(dev)
self.devices[dev.serial] = self.active_device
# 4. Restart measurement system
self.active_device.start_measurement(self.interval)
# Reconnect signals
self.measurement_thread = self.active_device.measurement_thread
self.measurement_thread.update_signal.connect(self.update_measurements)
self.measurement_thread.error_signal.connect(self.handle_device_error)
# 5. Update UI
self.handle_device_connection(True, f"Reconnected: {self.active_device.serial}")
self.toggle_button.setEnabled(True)
# Update device dropdown
self.device_combo.clear()
for serial in self.devices:
self.device_combo.addItem(serial)
self.device_combo.setCurrentText(self.active_device.serial)
except Exception as e:
self.handle_device_connection(False, f"Reconnect failed: {str(e)}")
if __name__ == "__main__":
app = QApplication([])
try:
window = BatteryTester()
window.show()
app.exec_()
except Exception as e:
import traceback
traceback.print_exc()
QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}")