From b10f210189b84371eb704398268081ce98321141 Mon Sep 17 00:00:00 2001 From: Jan Date: Sun, 10 Aug 2025 15:17:22 +0200 Subject: [PATCH] MainCode/adalm1000_logger.py aktualisiert MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit zwei geräte kein graf --- MainCode/adalm1000_logger.py | 3168 +++++++--------------------------- 1 file changed, 671 insertions(+), 2497 deletions(-) diff --git a/MainCode/adalm1000_logger.py b/MainCode/adalm1000_logger.py index d3d4606..afa5fa5 100644 --- a/MainCode/adalm1000_logger.py +++ b/MainCode/adalm1000_logger.py @@ -1,670 +1,448 @@ # -*- coding: utf-8 -*- import os import time -import csv -import threading -import traceback -from datetime import datetime -import numpy as np -import matplotlib -import subprocess -import logging import sys -from usb.core import USBError +import csv +import traceback +import numpy as np +import pysmu +import threading +from datetime import datetime +from collections import deque +import matplotlib matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure -from collections import deque -from queue import Queue, Full, Empty from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog, QComboBox) from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread from PyQt5.QtGui import QDoubleValidator -from PyQt5 import sip -from PyQt5.QtCore import pyqtSignal -import pysmu -from pysmu import Session - import matplotlib as mpl + mpl.rcParams['font.family'] = 'sans-serif' -mpl.rcParams['font.sans-serif'] = ['DejaVu Sans', 'Arial', 'Liberation Sans', 'Verdana'] # Fallback fonts +mpl.rcParams['font.sans-serif'] = ['DejaVu Sans', 'Arial', 'Liberation Sans', 'Verdana'] mpl.rcParams['axes.edgecolor'] = '#D8DEE9' mpl.rcParams['text.color'] = '#D8DEE9' mpl.rcParams['axes.labelcolor'] = '#D8DEE9' mpl.rcParams['xtick.color'] = '#D8DEE9' mpl.rcParams['ytick.color'] = '#D8DEE9' -logging.getLogger('matplotlib.font_manager').setLevel(logging.WARNING) -logging.basicConfig( - level=logging.DEBUG, - format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", - handlers=[ - logging.StreamHandler(sys.stdout), - logging.FileHandler("adalm_debug.log", mode="a") - ] -) -logger = logging.getLogger("adalm") +SAMPLE_RATE = 100000 # Samples per second +BLOCK_SIZE = 500 # Samples per read +MIN_VOLTAGE = 0.0 # Minimum allowed voltage +MAX_VOLTAGE = 5.0 # Maximum allowed voltage +UPDATE_INTERVAL = 100 # GUI update interval in ms -class DeviceDisconnectedError(Exception): - pass +class ADALM1000Worker(QObject): + data_ready = pyqtSignal(float, float, float, str) + status_update = pyqtSignal(str, str) + test_completed = pyqtSignal(str) + error_occurred = pyqtSignal(str, str) + capacity_update = pyqtSignal(float, float, int) -class DeviceManager: - def __init__(self, session=None, dev=None, serial=None): - self.session = session - self.dev = dev - self.serial = serial - self.measurement_thread = None - self.lock = threading.RLock() # reentrant lock - self._last_open_attempt = 0 - logger.info(f"Created DeviceManager for {serial}") - mpl.font_manager._get_font.cache_clear() - self.measurement_thread = MeasurementThread(self, 0.1, self) - - # Initialize data buffers and statistics - self.time_data = deque(maxlen=10000) - self.voltage_data = deque(maxlen=10000) - self.current_data = deque(maxlen=10000) - self.display_time_data = deque(maxlen=1000) - self.display_voltage_data = deque(maxlen=1000) - self.display_current_data = deque(maxlen=1000) - self.capacity_ah = 0.0 - self.energy = 0.0 - self.charge_capacity = 0.0 - self.coulomb_efficiency = 0.0 - self.cycle_count = 0 - self.test_phase = "Idle" - - # Add these new attributes for recording state - self.start_time = time.time() # Initialize with current time - self.last_update_time = self.start_time - self.is_recording = False + def __init__(self, device, dev_idx): + super().__init__() + self.device = device + self.dev_idx = dev_idx + self.running = True + self.measuring = False + self.logging = False self.log_file = None self.log_writer = None - self._last_log_time = 0 - self.log_dir = os.path.expanduser("~/adalm1000/logs") - os.makedirs(self.log_dir, exist_ok=True) - self.start_time = None - self.last_measurement_time = None - - def start_measurement(self): - if not self.measurement_thread.isRunning(): - self.measurement_thread.start() - - def reset_data(self): - """Reset all data buffers and statistics for the device""" - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - self.display_time_data.clear() - self.display_voltage_data.clear() - self.display_current_data.clear() - self.capacity_ah = 0.0 - self.energy = 0.0 - self.charge_capacity = 0.0 - self.coulomb_efficiency = 0.0 - self.cycle_count = 0 + self.test_running = False + self.request_stop = False self.test_phase = "Idle" - # Reset recording state - self.is_recording = False - self._last_log_time = 0 + self.cycle_count = 0 + self.capacity_ah = 0.0 + self.energy_wh = 0.0 + self.start_time = 0 + self.last_time = 0 + self.last_voltage = 0 + self.last_current = 0 + self.mode = "Live Monitoring" + self.params = { + 'capacity': 1.0, + 'c_rate': 0.1, + 'charge_cutoff': 1.43, + 'discharge_cutoff': 0.01, + 'rest_time': 0.5, + 'continuous': False + } + self.set_hiz() - def safe_call(self, fn, *args, **kwargs): - """Call fn under device lock and handle USB errors.""" - with self.lock: - try: - return fn(*args, **kwargs) - except (USBError, OSError, pysmu.exceptions.USBError) as e: - logger.exception(f"USB error in safe_call for {self.serial}") - raise DeviceDisconnectedError(f"USB error: {e}") from e - except Exception as e: - logger.exception(f"Error in safe_call for {self.serial}") - raise + def set_hiz(self): + """Set device to High Impedance mode""" + for ch in self.device.channels.values(): + ch.mode = pysmu.Mode.HI_Z + ch.constant(0) - def set_simv_current(self, current): - """Set channel A to SIMV and apply current.""" - def _do(): - chA = self.dev.channels['A'] - if hasattr(chA, 'Mode'): - try: - chA.mode = chA.Mode.SIMV - except Exception: - logger.debug("Couldn't set chA.mode to SIMV") - if hasattr(chA, 'constant'): - chA.constant(abs(current)) - else: - try: - chA.set_current(abs(current)) - except Exception: - logger.debug("No channel.constant or set_current available") - logger.debug(f"Device {self.serial}: set_simv_current {current}") - return self.safe_call(_do) + def set_simv(self, current): + """Set device to SIMV mode with specified current""" + self.device.channels['B'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].mode = pysmu.Mode.SIMV + self.device.channels['A'].constant(current) - def hi_z(self): - def _do(): - chA = self.dev.channels['A'] - if hasattr(chA, 'Mode'): - try: - chA.mode = chA.Mode.HI_Z - except Exception: - logger.debug("Couldn't set chA.mode to HI_Z") - if hasattr(chA, 'constant'): - chA.constant(0) - logger.debug(f"Device {self.serial}: hi_z()") - return self.safe_call(_do) - - def close(self): - """Close session/device references safely.""" - with self.lock: - try: - if self.dev: - try: - if hasattr(self.dev, 'close'): - self.dev.close() - except Exception: - logger.exception(f"Error closing device {self.serial}") - self.dev = None - if self.session: - try: - if hasattr(self.session, 'close'): - self.session.close() - except Exception: - logger.exception(f"Error closing session for {self.serial}") - self.session = None - except Exception: - logger.exception(f"Error in DeviceManager.close for {self.serial}") - - def reopen_with_backoff(self, max_attempts=5, base_delay=1.0): - """Try to reopen the device with exponential backoff.""" - now = time.time() - if now - self._last_open_attempt < 0.2: - time.sleep(0.2) - self._last_open_attempt = now - - for attempt in range(1, max_attempts + 1): - try: - logger.info(f"Attempting to reopen device {self.serial} (attempt {attempt}/{max_attempts})") - from pysmu import Session - s = Session() - devs = s.list_devices() - chosen = None - for d in devs: - try: - if hasattr(d, 'serial') and d.serial == self.serial: - chosen = s.open(d.serial) - break - if isinstance(d, (list, tuple)) and str(d[0]) == str(self.serial): - chosen = s.open(str(d[0])) - break - except Exception: - continue - if chosen is None and devs: - logger.warning(f"Could not find serial {self.serial}; opening first device") - chosen = s.open(devs[0].serial if hasattr(devs[0], 'serial') else str(devs[0][0])) - if chosen: - with self.lock: - try: - if self.dev and hasattr(self.dev, 'close'): - self.dev.close() - except Exception: - logger.exception("Error closing previous dev") - self.session = s - self.dev = chosen - logger.info(f"Reopened device {self.serial} OK") - return True - except Exception as e: - logger.exception(f"Reopen attempt {attempt} failed for {self.serial}: {e}") - time.sleep(base_delay * attempt) - logger.error(f"Failed to reopen device {self.serial} after {max_attempts} attempts") - return False - -class MeasurementThread(QThread): - update_signal = pyqtSignal(str, float, float, float) - error_signal = pyqtSignal(str) - - def __init__(self, device_manager, interval, parent_manager): - super().__init__() - self.dev_manager = device_manager - self.interval = interval - self._running = True - self.filter_window_size = 10 - self.voltage_window = [] - self.current_window = [] - self.start_time = None - self.measurement_queue = Queue(maxsize=1) - self.current_direction = 1 - self.parent_manager = parent_manager - - def stop(self): - self._running = False + def start_logging(self, filename): + """Start data logging to CSV file""" try: - self.dev_manager.hi_z() + self.log_file = open(filename, 'w', newline='') + self.log_writer = csv.writer(self.log_file) + self.log_writer.writerow([ + "Time(s)", "Voltage(V)", "Current(A)", "Phase", + "Capacity(Ah)", "Power(W)", "Energy(Wh)" + ]) + self.logging = True + return True except Exception as e: - logger.error(f"Error stopping device: {e}") - self.wait(500) + self.error_occurred.emit(f"Device {self.dev_idx}", f"Log start failed: {str(e)}") + return False + + def stop_logging(self): + """Stop data logging and close file""" + if self.logging and self.log_file: + try: + # Write summary + if hasattr(self, 'start_time') and self.start_time: + duration = time.time() - self.start_time + self.log_file.write("\n# TEST SUMMARY\n") + self.log_file.write(f"# End Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + self.log_file.write(f"# Duration: {self.format_time(duration)}\n") + self.log_file.write(f"# Total Capacity: {self.capacity_ah:.6f} Ah\n") + self.log_file.write(f"# Total Energy: {self.energy_wh:.6f} Wh\n") + self.log_file.write(f"# Cycle Count: {self.cycle_count}\n") + + self.log_file.close() + except Exception as e: + self.error_occurred.emit(f"Device {self.dev_idx}", f"Log stop failed: {str(e)}") + finally: + self.log_file = None + self.log_writer = None + self.logging = False + + def format_time(self, seconds): + """Convert seconds to hh:mm:ss format""" + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + seconds = int(seconds % 60) + return f"{hours:02d}:{minutes:02d}:{seconds:02d}" + + def start_test(self, mode, params): + """Start a test with specified parameters""" + self.mode = mode + self.params = params + self.test_running = True + self.request_stop = False + self.capacity_ah = 0.0 + self.energy_wh = 0.0 + self.cycle_count = 0 + self.start_time = time.time() + self.last_time = self.start_time + self.test_phase = "Starting" + self.status_update.emit(f"Device {self.dev_idx}", f"{mode} started") + + def stop_test(self): + """Stop current test and reset device""" + self.test_running = False + self.request_stop = True + self.set_hiz() + self.test_phase = "Idle" + self.status_update.emit(f"Device {self.dev_idx}", "Test stopped") def run(self): - logger.info(f"MeasurementThread STARTED for {self.parent_manager.serial}") - if not self.dev_manager.dev: - logger.error("No device available") - self.error_signal.emit("Device not initialized") - return - - # Initialize timing - self.dev_manager.start_time = time.time() - - while self._running: + """Main measurement loop""" + while self.running: try: - start_time = time.time() + if not self.measuring: + time.sleep(0.1) + continue + + # Read a block of samples + samples = self.device.read(BLOCK_SIZE, SAMPLE_RATE, True) - # Get measurement - samples = self.dev_manager.dev.get_samples(1) if not samples: time.sleep(0.01) continue - - sample = samples[0] - voltage = sample[0][0] - current = sample[0][1] + + # Process samples + vA = np.array([s[0][0] for s in samples]) + iA = np.array([s[0][1] for s in samples]) + + # Calculate averages + voltage = np.mean(vA) + current = np.mean(iA) current_time = time.time() - # Validate before sending - if not (0 <= voltage <= 5.0) or not (-0.2 <= current <= 0.2): - continue - - # Emit update - self.update_signal.emit( - self.parent_manager.serial, - voltage, - current, - current_time + # Only calculate elapsed time when a test is running + elapsed = 0 + if self.test_running and self.start_time > 0: + elapsed = current_time - self.start_time + + # Update capacity and energy + if self.last_time > 0 and self.test_running: + dt = current_time - self.last_time + self.capacity_ah += abs(current) * dt / 3600 + self.energy_wh += (voltage * abs(current)) * dt / 3600 + + self.last_time = current_time + self.last_voltage = voltage + self.last_current = current + + # Handle test modes + if self.test_running: + self.handle_test_mode(voltage, current) + else: + self.test_phase = "Monitoring" + + # Emit data to GUI + self.data_ready.emit( + elapsed, + voltage, + current, + self.test_phase ) - # Maintain precise timing - elapsed = time.time() - start_time - sleep_time = max(0.001, self.interval - elapsed) - time.sleep(sleep_time) + # Update capacity display + self.capacity_update.emit( + self.capacity_ah, + self.energy_wh, + self.cycle_count + ) - except (USBError, pysmu.exceptions.USBError) as e: - logger.error(f"USB Error: {str(e)}") - self.error_signal.emit("USB communication error") - time.sleep(1) + # Log data if enabled + if self.logging and self.log_file and not self.log_file.closed: + try: + self.log_writer.writerow([ + f"{elapsed:.4f}", + f"{voltage:.6f}", + f"{current:.6f}", + self.test_phase, + f"{self.capacity_ah:.4f}", + f"{voltage * abs(current):.4f}", + f"{self.energy_wh:.4f}" + ]) + except Exception as e: + self.error_occurred.emit(f"Device {self.dev_idx}", f"Log write error: {str(e)}") except Exception as e: - logger.error(f"Measurement error: {str(e)}") - time.sleep(0.5) + self.error_occurred.emit(f"Device {self.dev_idx}", f"Measurement error: {str(e)}") + traceback.print_exc() + time.sleep(1) - def _do_measure_once(self): - """Actual measurement code protected by DeviceManager lock""" - # This will be called inside DeviceManager.safe_call - samples = self.dev_manager.dev.get_samples(1) - if samples: - sample = samples[0] - voltage = sample[0][0] # Channel A voltage - current = sample[0][1] # Channel A current - return voltage, current, time.time() - return 0, 0, time.time() + def handle_test_mode(self, voltage, current): + """Execute test logic based on current mode""" + if self.mode == "Discharge Test": + self.handle_discharge_test(voltage, current) + elif self.mode == "Charge Test": + self.handle_charge_test(voltage, current) + elif self.mode == "Cycle Test": + self.handle_cycle_test(voltage, current) + elif self.mode == "Live Monitoring": + self.test_phase = "Monitoring" - def set_direction(self, direction): - self.current_direction = direction + def handle_discharge_test(self, voltage, current): + """Discharge test logic""" + self.test_phase = "Discharging" + discharge_current = -abs(self.params['c_rate'] * self.params['capacity']) + + # Set discharge current if not already set + if abs(current - discharge_current) > 0.01: + self.set_simv(discharge_current) + + # Check for discharge cutoff + if voltage <= self.params['discharge_cutoff'] or self.request_stop: + self.set_hiz() + self.test_running = False + self.test_phase = "Completed" + self.test_completed.emit(f"Device {self.dev_idx}") -class TestSequenceWorker(QThread): - finished = pyqtSignal() - update_phase = pyqtSignal(str) - update_status = pyqtSignal(str) - test_completed = pyqtSignal() - error_occurred = pyqtSignal(str) + def handle_charge_test(self, voltage, current): + """Charge test logic""" + self.test_phase = "Charging" + charge_current = abs(self.params['c_rate'] * self.params['capacity']) + + # Set charge current if not already set + if abs(current - charge_current) > 0.01: + self.set_simv(charge_current) + + # Check for charge cutoff + if voltage >= self.params['charge_cutoff'] or self.request_stop: + self.set_hiz() + self.test_running = False + self.test_phase = "Completed" + self.test_completed.emit(f"Device {self.dev_idx}") - def __init__(self, device_manager, test_current, charge_cutoff, discharge_cutoff, - rest_time_hours, continuous_mode, parent=None): - super().__init__(parent) - self.dev_manager = device_manager # DeviceManager instance - self.device = device_manager.dev # raw pysmu device for direct calls - self.test_current = float(test_current) - self.charge_cutoff = float(charge_cutoff) - self.discharge_cutoff = float(discharge_cutoff) - self.rest_time = float(rest_time_hours) * 3600.0 - self.continuous_mode = bool(continuous_mode) - self.parent = parent - self._running = True - self.voltage_timeout = 0.5 - - def stop(self): - self._running = False - - def _set_direction_on_measurement_thread(self, direction): - try: - mt = getattr(self.dev_manager, 'measurement_thread', None) - if mt and hasattr(mt, 'set_direction'): - mt.set_direction(direction) - except Exception: - pass - - def _set_simv_current(self, current): - try: - self.dev_manager.set_simv_current(current) - except DeviceDisconnectedError: - logger.error("Device disconnected during current set") - raise - - def _hi_z(self): - try: - self.dev_manager.hi_z() - except DeviceDisconnectedError: - logger.error("Device disconnected during hi-z") - raise - - def get_latest_measurement(self): - try: - q = self.dev_manager.measurement_thread.measurement_queue - item = q.get(timeout=self.voltage_timeout) - # support multiple formats - if isinstance(item, (tuple, list)): - if len(item) >= 3 and isinstance(item[0], str): - _, v, i, *rest = item - ts = rest[0] if rest else None - return v, i, ts - elif len(item) >= 2: - v = item[0]; i = item[1]; ts = item[2] if len(item) >= 3 else None - return v, i, ts - return None, None, None - except Empty: - return None, None, None - except Exception: - return None, None, None - - def charge_phase(self): - self.update_phase.emit("Charge") - self._set_direction_on_measurement_thread(1) - self._set_simv_current(abs(self.test_current)) - while self._running: - v, i, ts = self.get_latest_measurement() - if v is None: - time.sleep(0.05); continue - if v >= self.charge_cutoff: - break - time.sleep(0.1) - self._hi_z() - - def discharge_phase(self): - self.update_phase.emit("Discharge") - self._set_direction_on_measurement_thread(-1) - self._set_simv_current(-abs(self.test_current)) - while self._running: - v, i, ts = self.get_latest_measurement() - if v is None: - time.sleep(0.05); continue - if v <= self.discharge_cutoff: - break - time.sleep(0.1) - self._hi_z() - - def rest_phase(self, phase_name): - self.update_phase.emit(f"Rest: {phase_name}") - rest_end = time.time() + self.rest_time - while time.time() < rest_end and self._running: - time.sleep(0.5) - self.update_status.emit(f"Resting | {phase_name} | {max(0, rest_end-time.time()):.0f}s left") - - def run(self): - try: - while self._running: - self.discharge_phase() - if not self._running: break - if self.rest_time > 0: self.rest_phase("Post-Discharge") - if not self._running: break - self.charge_phase() - if not self._running: break - if self.rest_time > 0: self.rest_phase("Post-Charge") - if not self.continuous_mode: - break - self.test_completed.emit() - except Exception as e: - self.error_occurred.emit(str(e)) - finally: - try: self._hi_z() - except: pass - self.finished.emit() - -class DischargeWorker(QThread): - finished = pyqtSignal() - update_status = pyqtSignal(str) - test_completed = pyqtSignal() - error_occurred = pyqtSignal(str) - - def __init__(self, device_manager, discharge_current, cutoff_voltage, parent=None): - super().__init__(parent) - self.dev_manager = device_manager - self.device = device_manager.dev - self.discharge_current = float(discharge_current) - self.cutoff_voltage = float(cutoff_voltage) - self._running = True - self.voltage_timeout = 0.5 - - def stop(self): - self._running = False - - def _set_direction_on_measurement_thread(self, direction): - try: - mt = getattr(self.dev_manager, 'measurement_thread', None) - if mt and hasattr(mt, 'set_direction'): - mt.set_direction(direction) - except Exception: - pass - - def _set_simv_current(self, current): - try: - self.dev_manager.set_simv_current(current) - except DeviceDisconnectedError: - logger.error("Device disconnected during current set") - raise - - def _hi_z(self): - try: - self.dev_manager.hi_z() - except DeviceDisconnectedError: - logger.error("Device disconnected during hi-z") - raise - - def get_latest_measurement(self): - try: - q = self.dev_manager.measurement_thread.measurement_queue - item = q.get(timeout=self.voltage_timeout) - if isinstance(item, (tuple, list)): - if len(item) >= 3 and isinstance(item[0], str): - _, v, i, *rest = item; ts = rest[0] if rest else None; return v, i, ts - elif len(item) >= 2: - v = item[0]; i = item[1]; ts = item[2] if len(item)>=3 else None; return v, i, ts - return None, None, None - except Empty: - return None, None, None - except Exception: - return None, None, None - - def run(self): - try: - self._set_direction_on_measurement_thread(-1) - self._set_simv_current(-abs(self.discharge_current)) - while self._running: - v, i, ts = self.get_latest_measurement() - if v is None: - time.sleep(0.05); continue - if v <= self.cutoff_voltage: - break - time.sleep(0.1) - self.test_completed.emit() - except Exception as e: - self.error_occurred.emit(str(e)) - finally: - try: self._hi_z() - except: pass - self.finished.emit() - -class ChargeWorker(QThread): - finished = pyqtSignal() - update_status = pyqtSignal(str) - test_completed = pyqtSignal() - error_occurred = pyqtSignal(str) - - def __init__(self, device_manager, charge_current, cutoff_voltage, parent=None): - super().__init__(parent) - self.dev_manager = device_manager - self.device = device_manager.dev - self.charge_current = float(charge_current) - self.cutoff_voltage = float(cutoff_voltage) - self._running = True - self.voltage_timeout = 0.5 - - def stop(self): - self._running = False - - def _set_direction_on_measurement_thread(self, direction): - try: - mt = getattr(self.dev_manager, 'measurement_thread', None) - if mt and hasattr(mt, 'set_direction'): - mt.set_direction(direction) - except Exception: - pass - - def _set_simv_current(self, current): - try: - self.dev_manager.set_simv_current(current) - except DeviceDisconnectedError: - logger.error("Device disconnected during current set") - raise - - def _hi_z(self): - try: - self.dev_manager.hi_z() - except DeviceDisconnectedError: - logger.error("Device disconnected during hi-z") - raise - - def get_latest_measurement(self): - try: - q = self.dev_manager.measurement_thread.measurement_queue - item = q.get(timeout=self.voltage_timeout) - if isinstance(item, (tuple, list)): - if len(item) >= 3 and isinstance(item[0], str): - _, v, i, *rest = item; ts = rest[0] if rest else None; return v, i, ts - elif len(item) >= 2: - v = item[0]; i = item[1]; ts = item[2] if len(item)>=3 else None; return v, i, ts - return None, None, None - except Empty: - return None, None, None - except Exception: - return None, None, None - - def run(self): - try: - self._set_direction_on_measurement_thread(1) - self._set_simv_current(abs(self.charge_current)) - while self._running: - v, i, ts = self.get_latest_measurement() - if v is None: - time.sleep(0.05); continue - if v >= self.cutoff_voltage: - break - time.sleep(0.1) - self.test_completed.emit() - except Exception as e: - self.error_occurred.emit(str(e)) - finally: - try: self._hi_z() - except: pass - self.finished.emit() + def handle_cycle_test(self, voltage, current): + """Cycle test logic with state machine""" + if self.test_phase == "Discharging": + discharge_current = -abs(self.params['c_rate'] * self.params['capacity']) -class BatteryTester(QMainWindow): + if abs(current - discharge_current) > 0.01: + self.set_simv(discharge_current) + + if voltage <= self.params['discharge_cutoff'] or self.request_stop: + self.set_hiz() + self.test_phase = "Rest (Post-Discharge)" + self.rest_start_time = time.time() + + elif self.test_phase == "Rest (Post-Discharge)": + if time.time() - self.rest_start_time >= self.params['rest_time'] * 3600 or self.request_stop: + self.test_phase = "Charging" + + elif self.test_phase == "Charging": + charge_current = abs(self.params['c_rate'] * self.params['capacity']) + + if abs(current - charge_current) > 0.01: + self.set_simv(charge_current) + + if voltage >= self.params['charge_cutoff'] or self.request_stop: + self.set_hiz() + self.test_phase = "Rest (Post-Charge)" + self.rest_start_time = time.time() + + elif self.test_phase == "Rest (Post-Charge)": + if time.time() - self.rest_start_time >= self.params['rest_time'] * 3600 or self.request_stop: + self.cycle_count += 1 + + if self.params['continuous'] and not self.request_stop: + self.test_phase = "Discharging" + else: + self.test_running = False + self.test_phase = "Completed" + self.test_completed.emit(f"Device {self.dev_idx}") + + # Initial state for cycle test + elif self.test_phase in ["Starting", "Idle"]: + self.test_phase = "Discharging" + + def stop(self): + """Stop the worker thread""" + self.running = False + self.measuring = False + self.test_running = False + self.set_hiz() + if self.logging and self.log_file: + self.stop_logging() + +class BatteryTesterGUI(QMainWindow): def __init__(self): - self.plot_mutex = threading.Lock() super().__init__() - self.devices = {} - self.active_device = None - self.last_logged_phase = None - self.global_recording = False - self.debug_counter = 0 - self.last_debug_time = time.time() - - # Color scheme - MUST BE DEFINED FIRST + # Color scheme self.bg_color = "#2E3440" self.fg_color = "#D8DEE9" self.accent_color = "#5E81AC" self.warning_color = "#BF616A" self.success_color = "#A3BE8C" - # Status colors - MUST BE DEFINED BEFORE init_device() - self.status_colors = { - "connected": "green", - "disconnected": "red", - "error": "orange", - "active": self.accent_color, - "warning": self.warning_color - } - # Device and measurement state + self.session = None + self.devices = [] + self.workers = {} + self.threads = {} + self.current_device_idx = 0 self.session_active = False self.measuring = False - self.test_running = False - self.continuous_mode = False - self.request_stop = False - self.interval = 0.1 - self.log_dir = os.path.expanduser("~/adalm1000/logs") + self.log_dir = os.path.expanduser("~/battery_tester/logs") os.makedirs(self.log_dir, exist_ok=True) # Data buffers - self.aggregation_buffer = { - 'time': [], 'voltage': [], 'current': [], - 'count': 0, 'last_plot_time': 0 - } - self.phase_data = deque() - self.downsample_factor = 1 # Initial kein Downsampling - self.downsample_counter = 0 - - # Initialize all measurement variables + self.time_data = deque(maxlen=10000) + self.voltage_data = deque(maxlen=10000) + self.current_data = deque(maxlen=10000) + self.display_time_data = deque(maxlen=1000) + self.display_voltage_data = deque(maxlen=1000) + self.display_current_data = deque(maxlen=1000) + + # Initialize measurement variables self.capacity_ah = 0.0 - self.energy = 0.0 - self.charge_capacity = 0.0 - self.coulomb_efficiency = 0.0 + self.energy_wh = 0.0 self.cycle_count = 0 self.start_time = time.time() self.last_update_time = self.start_time - self.capacity = 1.0 - self.c_rate = 0.1 - self.charge_cutoff = 1.43 - self.discharge_cutoff = 0.01 - self.rest_time = 0.5 + # Default parameters + self.params = { + 'capacity': 1.0, + 'c_rate': 0.1, + 'charge_cutoff': 1.43, + 'discharge_cutoff': 0.01, + 'rest_time': 0.5, + 'continuous': True + } - # Initialize UI and device + # Initialize UI self.setup_ui() - self.init_device() self.current_mode = "Live Monitoring" self.change_mode(self.current_mode) # Set window properties - self.setWindowTitle("ADALM1000 - Battery Tester (Multi-Mode)") + self.setWindowTitle("ADALM1000 Battery Tester") self.resize(1000, 800) self.setMinimumSize(800, 700) # Status update timer self.status_timer = QTimer() self.status_timer.timeout.connect(self.update_status_and_plot) - self.status_timer.start(200) #every 200 ms + self.status_timer.start(UPDATE_INTERVAL) # Reduced update frequency + + # Initialize devices + self.init_devices() + self.update_button_colors() + + def init_devices(self): + """Initialize ADALM1000 devices""" + try: + self.session = pysmu.Session(ignore_dataflow=True, queue_size=20000) + self.devices = self.session.devices + + if not self.devices: + QMessageBox.warning(self, "No Devices", "No ADALM1000 devices found!") + return + + self.status_bar.showMessage(f"Found {len(self.devices)} device(s)") + self.session.start(0) + + # Create workers and threads for each device + for idx, dev in enumerate(self.devices): + worker = ADALM1000Worker(dev, idx) + thread = QThread() + worker.moveToThread(thread) + + # Connect signals + worker.data_ready.connect(self.update_data) + worker.status_update.connect(self.update_status) + worker.test_completed.connect(self.test_completed) + worker.error_occurred.connect(self.show_error) + worker.capacity_update.connect(self.update_capacity) + + thread.started.connect(worker.run) + self.workers[idx] = worker + self.threads[idx] = thread + + # Add device to combo box + self.device_combo.addItem(f"ADALM1000-{idx}") + + # Start threads + for thread in self.threads.values(): + thread.start() + + # Enable measurement for current device + self.enable_measurement(True) + self.session_active = True + + except Exception as e: + QMessageBox.critical(self, "Error", f"Device initialization failed: {str(e)}") + traceback.print_exc() + + def enable_measurement(self, enable): + """Enable/disable measurement for current device""" + if self.current_device_idx in self.workers: + self.workers[self.current_device_idx].measuring = enable - def print_device_status(self): - """Debug method to print current device states""" - for serial, device in self.devices.items(): - print(f"\nDevice: {serial}") - print(f"Active: {device == self.active_device}") - print(f"Start Time: {getattr(device, 'start_time', 'NOT SET')}") - print(f"Data Points: {len(device.time_data)}") - print(f"Last Voltage: {device.voltage_data[-1] if device.voltage_data else 'NONE'}") - print(f"Thread Running: {device.measurement_thread.isRunning() if hasattr(device, 'measurement_thread') else 'NO THREAD'}") - def setup_ui(self): - """Configure the user interface with all elements properly organized""" + """Configure the user interface""" # Main widget and layout self.central_widget = QWidget() self.setCentralWidget(self.central_widget) @@ -672,7 +450,7 @@ class BatteryTester(QMainWindow): self.main_layout.setContentsMargins(8, 8, 8, 8) self.main_layout.setSpacing(8) - # Base style for consistent sizing + # Base style base_style = f""" font-size: 10pt; color: {self.fg_color}; @@ -726,7 +504,7 @@ class BatteryTester(QMainWindow): min-height: 24px; }} """) - self.device_combo.currentIndexChanged.connect(self.change_device) + self.device_combo.currentIndexChanged.connect(self.device_changed) mode_layout.addWidget(self.device_combo, 1) self.main_layout.addWidget(mode_frame) @@ -744,28 +522,16 @@ class BatteryTester(QMainWindow): # Status indicator self.status_light = QLabel() self.status_light.setFixedSize(16, 16) - self.status_light.setStyleSheet("background-color: red; border-radius: 8px;") + self.status_light.setStyleSheet("background-color: green; border-radius: 8px;") header_layout.addWidget(self.status_light) self.connection_label = QLabel("Disconnected") self.connection_label.setStyleSheet(base_style) header_layout.addWidget(self.connection_label) - # Reconnect button - self.reconnect_btn = QPushButton("Reconnect") - self.reconnect_btn.setStyleSheet(f""" - {base_style} - background-color: #4C566A; - padding: 2px 8px; - border-radius: 3px; - min-height: 24px; - """) - self.reconnect_btn.clicked.connect(self.reconnect_device) - header_layout.addWidget(self.reconnect_btn) - self.main_layout.addWidget(header_frame) - # Measurement display - 4 rows x 2 columns + # Measurement display display_frame = QFrame() display_frame.setFrameShape(QFrame.StyledPanel) display_frame.setStyleSheet(f""" @@ -781,11 +547,11 @@ class BatteryTester(QMainWindow): display_layout.setVerticalSpacing(2) display_layout.setContentsMargins(5, 3, 5, 3) - # Measurement fields in exact order + # Measurement fields measurement_fields = [ ("Voltage", "V"), ("Current", "A"), ("Elapsed Time", "s"), ("Energy", "Wh"), - ("Test Phase", None), ("Capacity", "Ah"), # None for no unit + ("Test Phase", None), ("Capacity", "Ah"), ("Cycle Count", None), ("Coulomb Eff.", "%") ] @@ -793,19 +559,19 @@ class BatteryTester(QMainWindow): row = i // 2 col = (i % 2) * 2 - # Container for each measurement with fixed height + # Container for each measurement container = QWidget() - container.setFixedHeight(24) # Fixed row height + container.setFixedHeight(24) container_layout = QHBoxLayout(container) container_layout.setContentsMargins(2, 0, 2, 0) - container_layout.setSpacing(2) # Minimal spacing between elements + container_layout.setSpacing(2) - # Label (fixed width) + # Label lbl = QLabel(f"{label}:") lbl.setStyleSheet("min-width: 85px;") container_layout.addWidget(lbl) - # Value field (fixed width) + # Value field value_text = "0.000" if unit else ("Idle" if label == "Test Phase" else "0") value_lbl = QLabel(value_text) value_lbl.setAlignment(Qt.AlignRight) @@ -816,7 +582,7 @@ class BatteryTester(QMainWindow): """) container_layout.addWidget(value_lbl) - # Unit (only if exists) + # Unit if unit: unit_lbl = QLabel(unit) unit_lbl.setStyleSheet("min-width: 20px;") @@ -951,10 +717,6 @@ class BatteryTester(QMainWindow): # Single toggle button (Start/Stop) self.toggle_button = QPushButton("START") self.toggle_button.setCheckable(True) - self.toggle_button.setStyleSheet(button_style + f""" - background-color: {self.success_color}; - min-width: 120px; - """) self.toggle_button.clicked.connect(self.toggle_test) button_layout.addWidget(self.toggle_button) @@ -971,7 +733,7 @@ class BatteryTester(QMainWindow): self.record_button.setStyleSheet(button_style.replace( "background-color", "background-color", 1 ) + f"background-color: {self.success_color};") - self.record_button.clicked.connect(self.toggle_global_recording) + self.record_button.clicked.connect(self.toggle_recording) button_layout.addWidget(self.record_button) self.record_button.hide() @@ -996,118 +758,85 @@ class BatteryTester(QMainWindow): }} """) - def apply_button_style(self): - """Apply consistent button styling based on current state""" + def setup_plot(self): + """Configure the matplotlib plot""" + self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color) + self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) + self.ax = self.fig.add_subplot(111) + self.ax.set_facecolor('#3B4252') + + # Set initial voltage range + self.ax.set_ylim(0, 5.0) + + # Voltage plot + self.line_voltage, = self.ax.plot([0], [0], color='#00BFFF', label='Voltage (V)', linewidth=2) + self.ax.set_ylabel("Voltage (V)", color='#00BFFF') + self.ax.tick_params(axis='y', labelcolor='#00BFFF') + + # Current plot (right axis) + self.ax2 = self.ax.twinx() + self.ax2.set_ylim(-1.0, 1.0) + + self.line_current, = self.ax2.plot([0], [0], 'r-', label='Current (A)', linewidth=2) + self.ax2.set_ylabel("Current (A)", color='r') + self.ax2.tick_params(axis='y', labelcolor='r') + + self.ax.set_xlabel('Time (s)', color=self.fg_color) + self.ax.set_title('Battery Test', color=self.fg_color) + self.ax.tick_params(axis='x', colors=self.fg_color) + self.ax.grid(True, color='#4C566A') + + # Position legends + self.ax.legend(loc='upper left') + self.ax2.legend(loc='upper right') + + # Embed plot + self.canvas = FigureCanvas(self.fig) + self.canvas.setStyleSheet(f"background-color: {self.bg_color};") + self.main_layout.addWidget(self.canvas, 1) + + def update_button_colors(self): + """Update button colors based on state""" if self.toggle_button.isChecked(): - # Stop state - red self.toggle_button.setStyleSheet(f""" - QPushButton {{ - background-color: {self.warning_color}; - color: white; - font-weight: bold; - border: none; - border-radius: 4px; - padding: 4px 8px; - min-height: 28px; - }} - QPushButton:hover {{ - background-color: #{self.darker_color(self.warning_color)}; - }} + background-color: {self.warning_color}; + color: {self.fg_color}; + font-weight: bold; + padding: 4px 8px; + border-radius: 4px; + min-height: 28px; + min-width: 120px; """) else: - # Start state - green self.toggle_button.setStyleSheet(f""" - QPushButton {{ - background-color: {self.success_color}; - color: {self.fg_color}; - font-weight: bold; - border: none; - border-radius: 4px; - padding: 4px 8px; - min-height: 28px; - }} - QPushButton:hover {{ - background-color: #{self.darker_color(self.success_color)}; - }} + background-color: {self.success_color}; + color: {self.fg_color}; + font-weight: bold; + padding: 4px 8px; + border-radius: 4px; + min-height: 28px; + min-width: 120px; """) - - # Update record button separately + if self.record_button.isChecked(): self.record_button.setStyleSheet(f""" - QPushButton {{ - background-color: {self.warning_color}; - color: white; - font-weight: bold; - border: none; - border-radius: 4px; - padding: 4px 8px; - min-height: 28px; - }} + background-color: {self.warning_color}; + color: {self.fg_color}; + font-weight: bold; + padding: 4px 8px; + border-radius: 4px; + min-height: 28px; """) else: self.record_button.setStyleSheet(f""" - QPushButton {{ - background-color: {self.success_color}; - color: {self.fg_color}; - font-weight: bold; - border: none; - border-radius: 4px; - padding: 4px 8px; - min-height: 28px; - }} + background-color: {self.success_color}; + color: {self.fg_color}; + font-weight: bold; + padding: 4px 8px; + border-radius: 4px; + min-height: 28px; """) - def darker_color(self, hex_color): - """Helper to generate a darker shade for hover effects""" - if not hex_color.startswith('#'): - hex_color = '#' + hex_color - rgb = [int(hex_color[i:i+2], 16) for i in (1, 3, 5)] - darker = [max(0, c - 40) for c in rgb] - return ''.join([f"{c:02x}" for c in darker]) - - def toggle_global_recording(self): - """Toggle recording for all connected devices simultaneously""" - self.global_recording = not self.global_recording - - if self.global_recording: - # Start recording for all devices - for device in self.devices.values(): - device.is_recording = True - if not device.measurement_thread.isRunning(): - device.start_measurement(self.interval) - self.start_live_monitoring(device) - - self.record_button.setText("■ Stop Recording") - self.status_bar.showMessage("Recording started for all connected devices") - else: - # Stop recording for all devices - for device in self.devices.values(): - self.finalize_device_log_file(device) - device.is_recording = False - - self.record_button.setText("● Start Recording") - self.status_bar.showMessage("Recording stopped for all devices") - - self.apply_button_style() - - def safe_execute(func): - """Decorator to catch and log exceptions in Qt event handlers""" - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except Exception as e: - print(f"Error in {func.__name__}: {str(e)}") - traceback.print_exc() - return wrapper - - @safe_execute - def toggle_test(self, checked): - if checked: - self.start_test() - else: - self.stop_test() - self.apply_button_style() - def change_mode(self, mode_name): """Change between different test modes""" self.current_mode = mode_name @@ -1131,7 +860,7 @@ class BatteryTester(QMainWindow): # Record button only for live monitoring self.record_button.setVisible(mode_name == "Live Monitoring") - # Set button text based on mode and make sure it's visible for test modes + # Set button text based on mode if mode_name == "Cycle Test": self.toggle_button.setText("START CYCLE TEST") self.toggle_button.show() @@ -1142,1187 +871,232 @@ class BatteryTester(QMainWindow): self.toggle_button.setText("START CHARGE") self.toggle_button.show() elif mode_name == "Live Monitoring": - self.toggle_button.hide() # Only hide for Live Monitoring + self.toggle_button.hide() # Reset button state self.toggle_button.setChecked(False) self.toggle_button.setEnabled(True) - self.apply_button_style() - - # Reset measurement state and zero the time - if self.active_device: - dev = self.active_device - dev.reset_data() - - # Reset the measurement thread's start time - if hasattr(dev, 'measurement_thread'): - dev.measurement_thread.start_time = time.time() - - # Reset UI displays - self.capacity_label.setText("0.0000") - self.energy_label.setText("0.0000") - self.cycle_label.setText("0") - self.phase_label.setText("Idle") - #self.time_label.setText("00:00:00") - - # Reset plot - self.reset_plot() - self.status_bar.showMessage(f"Mode changed to {mode_name}") - - # Update recording button - if mode_name == "Live Monitoring": - self.record_button.setVisible(True) - if self.active_device and self.active_device.is_recording: - self.record_button.setChecked(True) - self.record_button.setText("Stop Recording") - else: - self.record_button.setChecked(False) - self.record_button.setText("Start Recording") - else: - self.record_button.setVisible(False) - - self.apply_button_style() - self.status_bar.showMessage(f"Mode changed to {mode_name}") - - def reset_test(self): - if not self.active_device: - return - - dev_manager = self.active_device - dev_manager.reset_data() # Reset in DeviceManager + # Reset measurement state + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + self.display_time_data.clear() + self.display_voltage_data.clear() + self.display_current_data.clear() - # Reset UI + # Reset UI displays self.capacity_label.setText("0.0000") self.energy_label.setText("0.0000") self.cycle_label.setText("0") self.phase_label.setText("Idle") - - def toggle_recording(self): - """Toggle data recording in Live Monitoring mode""" - if not self.active_device: - return - - dev = self.active_device - if not dev.is_recording: # Use device's recording state - try: - if self.create_cycle_log_file(): - dev.is_recording = True # Set device's recording state - self.record_button.setText("Stop Recording") - self.apply_button_style() # Update style - self.status_bar.showMessage("Live recording started") - # Ensure monitoring is running - if not self.test_running: - self.start_live_monitoring(self.active_device) - else: - self.record_button.setChecked(False) - self.current_cycle_file = None - except Exception as e: - print(f"Error starting recording: {e}") - self.record_button.setChecked(False) - self.current_cycle_file = None - QMessageBox.critical(self, "Error", f"Failed to start recording:\n{str(e)}") - else: - # Stop recording - try: - if hasattr(self, 'current_cycle_file') and self.current_cycle_file: - self.finalize_log_file() - dev.is_recording = False # Clear device's recording state - self.record_button.setText("Start Recording") - self.apply_button_style() # Update style - self.status_bar.showMessage("Live recording stopped") - except Exception as e: - print(f"Error stopping recording: {e}") - - def handle_continuous_mode_change(self, state): - """Handle changes to continuous mode checkbox during operation""" - if not state and self.test_running: # If unchecked during test - self.status_bar.showMessage("Continuous mode disabled - will complete current cycle") - self.continuous_mode_check.setStyleSheet(f"color: {self.warning_color};") - QTimer.singleShot(2000, lambda: self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")) - - def setup_plot(self): - """Configure the matplotlib plot""" - self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color) - self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) - self.ax = self.fig.add_subplot(111) - self.ax.set_facecolor('#3B4252') - - # Set initial voltage range - voltage_padding = 0.2 - min_voltage = max(0, 0.9 - voltage_padding) - max_voltage = 1.43 + voltage_padding - self.ax.set_ylim(min_voltage, max_voltage) - - # Voltage plot - self.line_voltage, = self.ax.plot([0], [0], color='#00BFFF', label='Voltage (V)', linewidth=2) - self.ax.set_ylabel("Voltage (V)", color='#00BFFF') - self.ax.tick_params(axis='y', labelcolor='#00BFFF') - - # Current plot (right axis) - self.ax2 = self.ax.twinx() - current_padding = 0.05 - test_current = 0.1 * 0.2 # Default values - max_current = test_current * 1.5 - self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) - - self.line_current, = self.ax2.plot([0], [0], 'r-', label='Current (A)', linewidth=2) - self.ax2.set_ylabel("Current (A)", color='r') - self.ax2.tick_params(axis='y', labelcolor='r') - - self.ax.set_xlabel('Time (s)', color=self.fg_color) - self.ax.set_title('Battery Test', color=self.fg_color) - self.ax.tick_params(axis='x', colors=self.fg_color) - self.ax.grid(True, color='#4C566A') - - # Position legends - self.ax.legend(loc='upper left') - self.ax2.legend(loc='upper right') - - # Embed plot - self.canvas = FigureCanvas(self.fig) - self.canvas.setStyleSheet(f"background-color: {self.bg_color};") - self.main_layout.addWidget(self.canvas, 1) - - def init_device(self): - """Robust device initialization""" - try: - # Close existing session - if hasattr(self, 'session'): - try: - self.session.end() - except: - pass - - # Create new session - self.session = Session(ignore_dataflow=True, queue_size=10000) - self.session.scan() - - # Retry mechanism - retry_count = 0 - while not self.session.devices and retry_count < 3: - self.handle_device_connection(False, f"Scanning... (Attempt {retry_count+1}/3)") - QApplication.processEvents() - time.sleep(1) - self.session.scan() - retry_count += 1 - - if not self.session.devices: - self.handle_device_connection(False, "No devices found") - return - - self.devices = {} - self.active_device = None - - for dev in self.session.devices: - serial = dev.serial if hasattr(dev, 'serial') else str(dev[0]) - logger.info(f"Found device: {serial}") - manager = DeviceManager(dev=dev, serial=serial) - self.devices[serial] = manager - - # Connect signals for all devices - manager.measurement_thread.update_signal.connect(self.update_measurements) - manager.measurement_thread.error_signal.connect(self.handle_device_error) - - # Select first device - first_serial = next(iter(self.devices.keys())) - self.active_device = self.devices[first_serial] - - if self.active_device: - if not self.active_device.measurement_thread.isRunning(): - self.active_device.measurement_thread.start() - - # Update UI - self.device_combo.clear() - for serial in self.devices: - self.device_combo.addItem(serial) - self.device_combo.setCurrentText(first_serial) - - self.session_active = True - self.handle_device_connection(True, f"Connected: {first_serial}") - self.toggle_button.setEnabled(True) - - # Connect measurement signals - self.measurement_thread = self.active_device.measurement_thread - self.measurement_thread.update_signal.connect(self.update_measurements) - self.measurement_thread.error_signal.connect(self.handle_device_error) - - # Initialize recording state - self.global_recording = False - self.record_button.setChecked(False) - self.record_button.setText("Start Recording") - - except Exception as e: - self.handle_device_connection(False, f"Initialization failed: {str(e)}") - - def handle_no_devices(self): - """Handle case when no devices are found""" - self.session_active = False - self.active_device = None - self.status_bar.showMessage("No ADALM1000 devices found") - self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") - self.toggle_button.setEnabled(False) - self.device_combo.clear() - - # Show reconnect button - self.reconnect_btn.setEnabled(True) - self.reconnect_btn.setVisible(True) - - def handle_device_connection(self, connected, message=None): - """Update connection status with proper coloring""" - if connected: - status = "connected" - if not message: - message = "Connected" - else: - status = "error" if "fail" in message.lower() else "disconnected" - if not message: - message = "Disconnected" - - color = self.status_colors.get(status, "orange") - self.connection_label.setText(message) - self.status_light.setStyleSheet(f""" - background-color: {color}; - border-radius: 10px; - min-width: 12px; - max-width: 12px; - min-height: 12px; - max-height: 12px; - """) - QApplication.processEvents() - - def check_connection(self): - """Periodically verify device connection""" - if not hasattr(self, 'session') or not self.session.devices: - self.handle_device_error("Device disconnected") - self.reconnect_device() - - def request_usb_permissions(self): - """Handle USB permission issues with user interaction""" - msg = QMessageBox(self) - msg.setIcon(QMessageBox.Critical) - msg.setWindowTitle("USB Permission Required") - msg.setText("Permission needed to access ADALM1000 devices") - msg.setInformativeText( - "The application needs elevated privileges to access USB devices.\n\n" - "Please choose an option:" - ) - - # Add buttons - sudo_button = msg.addButton("Run as Administrator", QMessageBox.ActionRole) - udev_button = msg.addButton("Fix Permissions", QMessageBox.ActionRole) - cancel_button = msg.addButton(QMessageBox.Cancel) - - msg.exec_() - - if msg.clickedButton() == sudo_button: - # Restart with sudo - QMessageBox.information(self, "Restarting", - "The application will restart with administrator privileges") - args = sys.argv[:] - args.insert(0, sys.executable) - os.execvp("sudo", ["sudo"] + args) - - elif msg.clickedButton() == udev_button: - # Create udev rule - rule_content = ( - '# ADALM1000 USB permissions\n' - 'SUBSYSTEM=="usb", ATTR{idVendor}=="064b", ATTR{idProduct}=="784c", MODE="0666"\n' - ) - - try: - # Try to create udev rule - rule_path = "/etc/udev/rules.d/52-adalm1000.rules" - with open(rule_path, "w") as f: - f.write(rule_content) - - # Apply rules - os.system("sudo udevadm control --reload-rules") - os.system("sudo udevadm trigger") - - QMessageBox.information(self, "Permissions Fixed", - "USB permissions configured. Please reconnect devices.") - except Exception as e: - QMessageBox.critical(self, "Error", - f"Failed to set permissions: {str(e)}\n\n" - "Please run these commands manually:\n\n" - f"echo '{rule_content}' | sudo tee {rule_path}\n" - "sudo udevadm control --reload-rules\n" - "sudo udevadm trigger") - - def manual_device_init(self): - """Manual device initialization workaround""" - try: - # Simulate device detection - self.device_combo.clear() - self.device_combo.addItem("ADALM1000-1 (Simulated)") - self.device_combo.addItem("ADALM1000-2 (Simulated)") - - # Mock connection - self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;") - self.connection_label.setText("Simulated Devices") - self.session_active = True - self.toggle_button.setEnabled(True) - - QMessageBox.warning(self, "Simulation Mode", - "Using simulated devices - real hardware not detected") - except Exception as e: - print(f"Manual init failed: {e}") - - @safe_execute - def change_device(self, index): - """Handle switching between connected devices""" - if not self.session_active or index < 0: - return - - serial = self.device_combo.itemText(index) - if serial not in self.devices: - return - - # Store current device state before switching - old_device = self.active_device - - # Disconnect signals from old device's measurement thread - if old_device and old_device.measurement_thread: - try: - with self.plot_mutex: # Ensure thread-safe disconnection - old_device.measurement_thread.update_signal.disconnect(self.update_measurements) - old_device.measurement_thread.error_signal.disconnect(self.handle_device_error) - if old_device.is_recording: - self.finalize_device_log_file(old_device) - except (TypeError, RuntimeError): - # Signals weren't connected or already disconnected - safe to ignore - pass - - # Activate new device - self.active_device = self.devices[serial] - dev = self.active_device - - # Ensure measurement thread exists - if not hasattr(dev, 'measurement_thread'): - dev.measurement_thread = MeasurementThread(dev, self.interval, dev) - - # Get reference to new thread - new_thread = dev.measurement_thread - - # Connect signals for new device - with self.plot_mutex: - new_thread.update_signal.connect(self.update_measurements) - new_thread.error_signal.connect(self.handle_device_error) - - # Start measurement if not running (with thread safety) - if not new_thread.isRunning(): - new_thread.start() - - # Initialize device time tracking if needed - if not hasattr(dev, 'start_time'): - dev.start_time = time.time() - - # Update UI with current device data - self.update_ui_from_active_device() - - # Handle recording state transition - if old_device and old_device.is_recording: - # Finalize old device recording - self.finalize_device_log_file(old_device) - - # Set up recording for new device if global recording is enabled - if self.global_recording and not dev.is_recording: - self.start_live_monitoring(dev) - - # Update recording button state - self.record_button.setChecked(dev.is_recording) - self.record_button.setText("■ Stop Recording" if dev.is_recording else "● Start Recording") - self.apply_button_style() - - # Reset plot for new device + # Reset plot self.reset_plot() - # Update status - self.status_bar.showMessage(f"Switched to device: {serial}") - self.set_connection_status(f"Connected: {serial}", self.status_colors["connected"]) + self.status_bar.showMessage(f"Mode changed to {mode_name}") - if not self.active_device.measurement_thread.isRunning(): - self.active_device.measurement_thread.start() - - def update_ui_from_active_device(self): - dev = self.active_device - if not dev: - return - - with self.plot_mutex: - # Copy current data - x = list(dev.display_time_data) - y_v = list(dev.display_voltage_data) - y_c = list(dev.display_current_data) - - # Update plot - self.line_voltage.set_data(x, y_v) - self.line_current.set_data(x, y_c) + def device_changed(self, index): + """Handle device selection change""" + # Disable measurement for previous device + if self.current_device_idx in self.workers: + self.workers[self.current_device_idx].measuring = False - # Fix: Pass required arguments to auto_scale_axes - if x and y_v and y_c: # Only call if data exists - self.auto_scale_axes(x, y_v, y_c) + # Update current device index + self.current_device_idx = index + + # Enable measurement for new device + if self.current_device_idx in self.workers: + self.workers[self.current_device_idx].measuring = True + + # Reset data buffers + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() + self.display_time_data.clear() + self.display_voltage_data.clear() + self.display_current_data.clear() + + # Reset plot + self.reset_plot() + + self.status_bar.showMessage(f"Switched to device {index}") + + @pyqtSlot(float, float, float, str) + def update_data(self, elapsed, voltage, current, phase): + """Update data from worker thread""" + # Only update if a test is running or in live monitoring + if elapsed > 0 or self.current_mode == "Live Monitoring": + # Store data + self.time_data.append(elapsed) + self.voltage_data.append(voltage) + self.current_data.append(current) + # Update display buffers + self.display_time_data.append(elapsed) + self.display_voltage_data.append(voltage) + self.display_current_data.append(current) + + # Update UI + self.voltage_label.setText(f"{voltage:.4f}") + self.current_label.setText(f"{current:.4f}") + self.phase_label.setText(phase) + + @pyqtSlot(float, float, int) + def update_capacity(self, capacity_ah, energy_wh, cycle_count): + """Update capacity data from worker thread""" + self.capacity_ah = capacity_ah + self.energy_wh = energy_wh + self.cycle_count = cycle_count + self.capacity_label.setText(f"{capacity_ah:.4f}") + self.energy_label.setText(f"{energy_wh:.4f}") + self.cycle_label.setText(str(cycle_count)) + + @pyqtSlot(str, str) + def update_status(self, device, status): + """Update status from worker thread""" + self.status_bar.showMessage(f"{device}: {status}") + if device == f"Device {self.current_device_idx}": + self.phase_label.setText(status.split(":")[-1].strip()) + + @pyqtSlot(str) + def test_completed(self, device): + """Handle test completion""" + if device == f"Device {self.current_device_idx}": + self.toggle_button.setChecked(False) + self.update_button_colors() + self.status_bar.showMessage(f"{device}: Test completed") + + @pyqtSlot(str, str) + def show_error(self, device, error): + """Show error message from worker thread""" + QMessageBox.critical(self, f"{device} Error", error) + self.status_bar.showMessage(f"{device}: {error}") + + def toggle_test(self, checked): + """Toggle test start/stop""" + self.toggle_button.setChecked(checked) + self.update_button_colors() + + if checked: + self.start_test() + else: + self.stop_test() + + def start_test(self): + """Start the selected test mode""" + if self.current_device_idx not in self.workers: + QMessageBox.warning(self, "Error", "No active device selected!") + return + + try: + # Get parameters from UI + params = { + 'capacity': float(self.capacity_input.text()), + 'c_rate': float(self.c_rate_input.text()), + 'charge_cutoff': float(self.charge_cutoff_input.text()), + 'discharge_cutoff': float(self.discharge_cutoff_input.text()), + 'rest_time': float(self.rest_time_input.text()), + 'continuous': self.continuous_mode_check.isChecked() + } + + # Start test on worker + self.workers[self.current_device_idx].start_test( + self.current_mode, + params + ) + + # Update UI + self.toggle_button.setText("STOP") + self.status_bar.showMessage(f"{self.current_mode} started on device {self.current_device_idx}") + + except Exception as e: + QMessageBox.critical(self, "Error", f"Invalid parameters: {str(e)}") + self.stop_test() + + def stop_test(self): + """Stop the current test""" + if self.current_device_idx in self.workers: + self.workers[self.current_device_idx].stop_test() + + # Update UI + if self.current_mode == "Cycle Test": + self.toggle_button.setText("START CYCLE TEST") + elif self.current_mode == "Discharge Test": + self.toggle_button.setText("START DISCHARGE") + elif self.current_mode == "Charge Test": + self.toggle_button.setText("START CHARGE") + else: + self.toggle_button.setText("START") + + self.toggle_button.setChecked(False) + self.update_button_colors() + self.status_bar.showMessage("Test stopped") + + def toggle_recording(self, checked): + """Toggle data recording""" + self.record_button.setChecked(checked) + self.update_button_colors() + + if checked: + try: + # Create log file + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = os.path.join(self.log_dir, f"device{self.current_device_idx}_test_{timestamp}.csv") + + # Start logging on worker + if self.current_device_idx in self.workers: + if self.workers[self.current_device_idx].start_logging(filename): + self.record_button.setText("■ Stop Recording") + self.status_bar.showMessage("Recording started") + else: + self.record_button.setChecked(False) + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to start recording: {str(e)}") + self.record_button.setChecked(False) + else: + if self.current_device_idx in self.workers: + self.workers[self.current_device_idx].stop_logging() + self.record_button.setText("● Start Recording") + self.status_bar.showMessage("Recording stopped") + + def update_plot(self): + """Update the plot with current data""" + if not self.display_time_data: + return + + self.line_voltage.set_data(self.display_time_data, self.display_voltage_data) + self.line_current.set_data(self.display_time_data, self.display_current_data) + + # Auto-scale axes + if len(self.display_time_data) > 1: + self.ax.set_xlim(0, max(10, self.display_time_data[-1] * 1.05)) + + min_v = max(MIN_VOLTAGE, min(self.display_voltage_data) * 0.95) + max_v = min(MAX_VOLTAGE, max(self.display_voltage_data) * 1.05) + self.ax.set_ylim(min_v, max_v) + + min_c = min(self.display_current_data) * 1.1 + max_c = max(self.display_current_data) * 1.1 + self.ax2.set_ylim(min_c, max_c) + self.canvas.draw_idle() - # Update labels - if dev.voltage_data: - v = dev.voltage_data[-1] - i = dev.current_data[-1] - t = dev.time_data[-1] if dev.time_data else 0 - self.voltage_label.setText(f"{v:.4f}") - self.current_label.setText(f"{abs(i):.4f}") - self.time_label.setText(self.format_time(t)) - self.capacity_label.setText(f"{dev.capacity_ah:.4f}") - self.energy_label.setText(f"{dev.energy:.4f}") - self.cycle_label.setText(str(dev.cycle_count)) - self.phase_label.setText(dev.test_phase) - - @safe_execute - @pyqtSlot(str, float, float, float) - def update_measurements(self, serial, voltage, current, current_time): - """Debugged measurement update handler""" - try: - device = self.devices.get(serial) - if not device: - return - - # Ensure timing is initialized - if not hasattr(device, 'start_time') or device.start_time is None: - device.start_time = current_time - logger.debug(f"Initialized start time for {serial}") - - # Calculate elapsed time safely - try: - elapsed = current_time - device.start_time - except TypeError: - logger.error(f"Invalid timing - current: {current_time}, start: {device.start_time}") - device.start_time = current_time - elapsed = 0 - - # Validate measurements - if not (0 <= voltage <= 5.0) or not (-0.2 <= current <= 0.2): - logger.warning(f"Invalid values - V: {voltage:.3f}, I: {current:.3f}") - return - - with self.plot_mutex: - device.time_data.append(elapsed) - device.voltage_data.append(voltage) - device.current_data.append(current) - - device.display_time_data.append(elapsed) - device.display_voltage_data.append(voltage) - device.display_current_data.append(current) - - # Trim display buffers if needed - if len(device.display_time_data) > 1000: - device.display_time_data.popleft() - device.display_voltage_data.popleft() - device.display_current_data.popleft() - - # Only update UI for active device - if device == self.active_device: - self.voltage_label.setText(f"{voltage:.4f}") - self.current_label.setText(f"{abs(current):.4f}") - self.time_label.setText(self.format_time(elapsed)) - - # Calculate metrics if we have enough data - if len(device.time_data) > 1: - delta_t = device.time_data[-1] - device.time_data[-2] - device.capacity_ah += abs(current) * delta_t / 3600 - device.energy += (voltage * abs(current)) * delta_t / 3600 - self.capacity_label.setText(f"{device.capacity_ah:.4f}") - self.energy_label.setText(f"{device.energy:.4f}") - - except Exception as e: - logger.error(f"Update error: {str(e)}") - - def adjust_downsampling(self): - current_length = len(self.time_data) - if current_length > self.max_points_to_keep * 1.5: - # Exponentiell erhöhen, aber max. 64 - new_factor = min(64, max(1, self.downsample_factor * 2)) - elif current_length < self.max_points_to_keep // 2: - # Halbieren, aber min. 1 - new_factor = max(1, self.downsample_factor // 2) - else: - return - - if new_factor != self.downsample_factor: - self.downsample_factor = new_factor - self.status_bar.showMessage( - f"Downsampling: Factor {self.downsample_factor}", 2000) def update_status_and_plot(self): - """Combined status and plot update""" - self.update_status() - self.update_plot() - - def update_status(self): - """Update status information periodically""" - now = time.time() - - # Update all devices - for dev_manager in self.devices.values(): - # Only process if device has data - if not dev_manager.time_data: - continue - - # Update capacity calculation - if len(dev_manager.time_data) > 1: - idx = len(dev_manager.time_data) - 1 - delta_t = dev_manager.time_data[idx] - dev_manager.time_data[idx-1] - current_val = abs(dev_manager.current_data[idx]) - power = dev_manager.voltage_data[idx] * current_val - dev_manager.capacity_ah += current_val * delta_t / 3600 - dev_manager.energy += power * delta_t / 3600 - - # Write to log if recording - if dev_manager.is_recording and dev_manager.log_writer: - try: - if now - getattr(dev_manager, '_last_log_time', 0) >= 1.0: - dev_manager.log_writer.writerow([ - f"{dev_manager.time_data[-1]:.4f}", - f"{dev_manager.voltage_data[-1]:.6f}", - f"{dev_manager.current_data[-1]:.6f}", - "Live", - f"{dev_manager.capacity_ah:.4f}", - f"{power:.4f}", - f"{dev_manager.energy:.4f}" - ]) - dev_manager.log_file.flush() - dev_manager._last_log_time = now - except Exception as e: - print(f"Log write error for device {dev_manager.serial}: {e}") - dev_manager.is_recording = False - - # Update UI for active device - if self.active_device and self.active_device.time_data: - dev = self.active_device - self.capacity_label.setText(f"{dev.capacity_ah:.4f}") - self.energy_label.setText(f"{dev.energy:.4f}") - - # Update elapsed-time display - if self.active_device and self.active_device.time_data: - elapsed = self.active_device.time_data[-1] + """Periodic status update""" + if self.time_data: + elapsed = self.time_data[-1] self.time_label.setText(self.format_time(elapsed)) - - @safe_execute - def start_test(self): - """Start the selected test mode using the active device""" - if not self.active_device: - QMessageBox.warning(self, "No Device", "No ADALM1000 device selected.") - self.toggle_button.setChecked(False) - self.apply_button_style() - return - - dev_manager = self.active_device - dev = dev_manager.dev - - # Clean up any previous test - self.cleanup_test_threads() - - # Reset test state for active device - self.reset_test() - - # Reset measurement thread timing - dev_manager.measurement_thread.start_time = time.time() - dev_manager.measurement_thread.voltage_window.clear() - dev_manager.measurement_thread.current_window.clear() - with dev_manager.measurement_thread.measurement_queue.mutex: - dev_manager.measurement_thread.measurement_queue.queue.clear() - self.time_label.setText("00:00:00") - - # Reset data buffers - dev_manager.time_data.clear() - dev_manager.voltage_data.clear() - dev_manager.current_data.clear() - dev_manager.display_time_data.clear() - dev_manager.display_voltage_data.clear() - dev_manager.display_current_data.clear() - dev_manager.aggregation_buffer = { - 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0 - } - - # Reset device state - try: - dev.channels['A'].mode = pysmu.Mode.HI_Z - dev.channels['A'].constant(0) - dev.channels['B'].mode = pysmu.Mode.HI_Z - dev.channels['B'].constant(0) - except Exception as e: - QMessageBox.critical(self, "Device Error", f"Failed to reset device: {e}") - self.toggle_button.setChecked(False) - self.apply_button_style() - return - - # Reset test variables - dev_manager.capacity_ah = 0.0 - dev_manager.energy = 0.0 - dev_manager.charge_capacity = 0.0 - dev_manager.coulomb_efficiency = 0.0 - dev_manager.cycle_count = 0 - dev_manager.start_time = time.time() - dev_manager.test_phase = "Running" - - # Set global state - self.test_running = True - self.request_stop = False - - # Update UI - self.phase_label.setText(dev_manager.test_phase) - self.toggle_button.setText("STOP") - self.apply_button_style() - - # Get parameters from UI - try: - self.capacity = float(self.capacity_input.text()) - self.c_rate = float(self.c_rate_input.text()) - test_current = self.c_rate * self.capacity - if test_current > 0.2: - raise ValueError("Current must be ≤ 200mA (0.2A) for ADALM1000") - except ValueError as e: - QMessageBox.critical(self, "Input Error", str(e)) - self.stop_test() - return - - # Create log file - using start_live_monitoring instead of _start_device_recording - if not self.start_live_monitoring(self.active_device): - self.stop_test() - return - - # Start the appropriate test - if self.current_mode == "Cycle Test": - try: - self.charge_cutoff = float(self.charge_cutoff_input.text()) - self.discharge_cutoff = float(self.discharge_cutoff_input.text()) - self.rest_time = float(self.rest_time_input.text()) - if self.charge_cutoff <= self.discharge_cutoff: - raise ValueError("Charge cutoff must be higher than discharge cutoff") - - # Start test sequence - self.test_sequence_thread = QThread() - self.test_sequence_worker = TestSequenceWorker( - self.active_device, - test_current, - self.charge_cutoff, - self.discharge_cutoff, - self.rest_time, - self.continuous_mode_check.isChecked(), - self - ) - self.test_sequence_worker.moveToThread(self.test_sequence_thread) - - self.test_sequence_worker.update_phase.connect(self.update_test_phase) - self.test_sequence_worker.update_status.connect(self.status_bar.showMessage) - self.test_sequence_worker.test_completed.connect(self.finalize_test) - self.test_sequence_worker.error_occurred.connect(self.handle_test_error) - self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit) - self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater) - self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater) - - self.test_sequence_thread.start() - QTimer.singleShot(0, self.test_sequence_worker.run) - - self.status_bar.showMessage(f"Cycle test started | Device: {dev.serial} | Current: {test_current:.4f}A") - - except Exception as e: - QMessageBox.critical(self, "Error", str(e)) - self.stop_test() - - elif self.current_mode == "Discharge Test": - try: - self.discharge_cutoff = float(self.discharge_cutoff_input.text()) - - self.discharge_thread = QThread() - self.discharge_worker = DischargeWorker(self.active_device, test_current, self.discharge_cutoff, self) - self.discharge_worker.moveToThread(self.discharge_thread) - - self.discharge_worker.update_status.connect(self.status_bar.showMessage) - self.discharge_worker.test_completed.connect(self.finalize_test) - self.discharge_worker.error_occurred.connect(self.handle_test_error) - self.discharge_worker.finished.connect(self.discharge_thread.quit) - self.discharge_worker.finished.connect(self.discharge_worker.deleteLater) - self.discharge_thread.finished.connect(self.discharge_thread.deleteLater) - - self.discharge_thread.start() - QTimer.singleShot(0, self.discharge_worker.run) - - self.status_bar.showMessage(f"Discharge test started | Device: {dev.serial} | Current: {test_current:.4f}A") - - except Exception as e: - QMessageBox.critical(self, "Error", str(e)) - self.stop_test() - - elif self.current_mode == "Charge Test": - try: - self.charge_cutoff = float(self.charge_cutoff_input.text()) - - self.charge_thread = QThread() - self.charge_worker = ChargeWorker(self.active_device, test_current, self.charge_cutoff, self) - self.charge_worker.moveToThread(self.charge_thread) - - self.charge_worker.update_status.connect(self.status_bar.showMessage) - self.charge_worker.test_completed.connect(self.finalize_test) - self.charge_worker.error_occurred.connect(self.handle_test_error) - self.charge_worker.finished.connect(self.charge_thread.quit) - self.charge_worker.finished.connect(self.charge_worker.deleteLater) - self.charge_thread.finished.connect(self.charge_thread.deleteLater) - - self.charge_thread.start() - QTimer.singleShot(0, self.charge_worker.run) - - self.status_bar.showMessage(f"Charge test started | Device: {dev.serial} | Current: {test_current:.4f}A") - - except Exception as e: - QMessageBox.critical(self, "Error", str(e)) - self.stop_test() - - elif self.current_mode == "Live Monitoring": - # Ensure we pass the device parameter - self.start_live_monitoring(self.active_device) - - def start_cycle_test(self): - """Start the battery cycle test""" - # Clean up any previous test - if hasattr(self, 'test_sequence_worker'): - try: - self.test_sequence_worker.stop() - except: - pass - self.test_sequence_worker.deleteLater() - if hasattr(self, 'test_sequence_thread'): - self.test_sequence_thread.quit() - self.test_sequence_thread.wait() - self.test_sequence_thread.deleteLater() - del self.test_sequence_thread - self.reset_test() - self.reset_plot() - if hasattr(self, 'measurement_thread'): - self.measurement_thread.start_time = time.time() - - # Reset stop flag - self.request_stop = False - - if not self.test_running: - try: - # Get parameters from UI - self.capacity = float(self.capacity_input.text()) - self.charge_cutoff = float(self.charge_cutoff_input.text()) - self.discharge_cutoff = float(self.discharge_cutoff_input.text()) - self.rest_time = float(self.rest_time_input.text()) - self.c_rate = float(self.c_rate_input.text()) - - # Validate inputs - if self.capacity <= 0: - raise ValueError("Battery capacity must be positive") - if self.charge_cutoff <= self.discharge_cutoff: - raise ValueError("Charge cutoff must be higher than discharge cutoff") - if self.c_rate <= 0: - raise ValueError("C-rate must be positive") - - test_current = self.c_rate * self.capacity - if test_current > 0.2: - raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") - - # Clear ALL previous data completely - with self.plot_mutex: - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - self.phase_data.clear() - - # Reset capacities and timing - self.start_time = time.time() - self.last_update_time = self.start_time - self.capacity_ah = 0.0 - self.charge_capacity = 0.0 - self.coulomb_efficiency = 0.0 - self.cycle_count = 0 - self.energy = 0.0 - - # Reset measurement thread's timer and queues - if hasattr(self, 'measurement_thread'): - self.measurement_thread.start_time = time.time() - self.measurement_thread.voltage_window.clear() - self.measurement_thread.current_window.clear() - with self.measurement_thread.measurement_queue.mutex: - self.measurement_thread.measurement_queue.queue.clear() - - # Reset plot completely - self.reset_plot() - - # Start test - self.test_running = True - self.start_time = time.time() - self.last_update_time = time.time() - self.test_phase = "Initial Discharge" - self.phase_label.setText(self.test_phase) - - self.toggle_button.setChecked(True) - self.toggle_button.setText("STOP") - self.apply_button_style() - self.status_bar.showMessage(f"Cycle test started | Current: {test_current:.4f}A") - - # Create log file - self.create_cycle_log_file() - - # Start test sequence in a QThread - self.test_sequence_thread = QThread() - self.test_sequence_worker = TestSequenceWorker( - self.active_device.dev, - test_current, - self.charge_cutoff, - self.discharge_cutoff, - self.rest_time, - self.continuous_mode_check.isChecked(), - self # Pass reference to main window for callbacks - ) - self.test_sequence_worker.moveToThread(self.test_sequence_thread) - - # Connect signals - self.test_sequence_worker.update_phase.connect(self.update_test_phase) - self.test_sequence_worker.update_status.connect(self.status_bar.showMessage) - self.test_sequence_worker.test_completed.connect(self.finalize_test) - self.test_sequence_worker.error_occurred.connect(self.handle_test_error) - self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit) - self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater) - self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater) - - # Start the thread and the worker's run method - self.test_sequence_thread.start() - QTimer.singleShot(0, self.test_sequence_worker.run) - - except Exception as e: - QMessageBox.critical(self, "Error", str(e)) - # Ensure buttons are in correct state if error occurs - self.toggle_button.setChecked(False) - self.toggle_button.setText("START") - self.apply_button_style() - self.toggle_button.setEnabled(True) - - def start_discharge_test(self): - """Start the battery discharge test""" - # Clean up any previous test - self.reset_test() # löscht time_data, voltage_data, current_data, display_*, phase_data - self.reset_plot() - if hasattr(self, 'measurement_thread'): - self.measurement_thread.start_time = time.time() - if hasattr(self, 'discharge_worker'): - try: - self.discharge_worker.stop() - except: - pass - self.discharge_worker.deleteLater() - if hasattr(self, 'discharge_thread'): - self.discharge_thread.quit() - self.discharge_thread.wait() # warte unbegrenzt, bis er wirklich fertig ist - self.discharge_thread.deleteLater() - del self.discharge_thread - # Reset stop flag - self.request_stop = False - - if not self.test_running: - try: - # Get parameters from UI - self.capacity = float(self.capacity_input.text()) - self.discharge_cutoff = float(self.discharge_cutoff_input.text()) - self.c_rate = float(self.c_rate_input.text()) - - # Validate inputs - if self.capacity <= 0: - raise ValueError("Battery capacity must be positive") - if self.c_rate <= 0: - raise ValueError("C-rate must be positive") - - test_current = self.c_rate * self.capacity - if test_current > 0.2: - raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") - - # Clear ALL previous data completely - with self.plot_mutex: - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - - # Reset capacities and timing - self.start_time = time.time() - self.last_update_time = self.start_time - self.capacity_ah = 0.0 - self.energy = 0.0 - self.cycle_count = 1 - - # Reset measurement thread's timer and queues - if hasattr(self, 'measurement_thread'): - self.measurement_thread.start_time = time.time() - self.measurement_thread.voltage_window.clear() - self.measurement_thread.current_window.clear() - with self.measurement_thread.measurement_queue.mutex: - self.measurement_thread.measurement_queue.queue.clear() - - # Reset plot completely - self.reset_plot() - - # Start test - self.test_running = True - self.start_time = time.time() - self.last_update_time = time.time() - self.test_phase = "Discharge" - self.phase_label.setText(self.test_phase) - - self.toggle_button.setChecked(True) - self.toggle_button.setText("STOP") - self.apply_button_style() - self.status_bar.showMessage(f"Discharge started | Current: {test_current:.4f}A") - - # Create log file - self.create_cycle_log_file() - - # Start discharge worker in a QThread - self.discharge_thread = QThread() - self.discharge_worker = DischargeWorker( - self.active_device.dev, - test_current, - self.discharge_cutoff, - self # Pass reference to main window for callbacks - ) - self.discharge_worker.moveToThread(self.discharge_thread) - - # Connect signals - self.discharge_worker.update_status.connect(self.status_bar.showMessage) - self.discharge_worker.test_completed.connect(self.finalize_test) - self.discharge_worker.error_occurred.connect(self.handle_test_error) - self.discharge_worker.finished.connect(self.discharge_thread.quit) - self.discharge_worker.finished.connect(self.discharge_worker.deleteLater) - self.discharge_thread.finished.connect(self.discharge_thread.deleteLater) - - # Start the thread and the worker's run method - self.discharge_thread.start() - QTimer.singleShot(0, self.discharge_worker.run) - - except Exception as e: - QMessageBox.critical(self, "Error", str(e)) - # Ensure buttons are in correct state if error occurs - self.toggle_button.setChecked(False) - self.toggle_button.setText("START") - self.apply_button_style() - self.toggle_button.setEnabled(True) - - def start_charge_test(self): - """Start the battery charge test""" - # Clean up any previous test - if hasattr(self, 'charge_worker'): - try: - self.charge_worker.stop() - except: - pass - self.charge_worker.deleteLater() - if hasattr(self, 'charge_thread'): - self.charge_thread.quit() - self.charge_thread.wait() - self.charge_thread.deleteLater() - del self.charge_thread - self.reset_test() - self.reset_plot() - if hasattr(self, 'measurement_thread'): - self.measurement_thread.start_time = time.time() + self.update_plot() - # Reset stop flag - self.request_stop = False - - if not self.test_running: - try: - # Get parameters from UI - self.capacity = float(self.capacity_input.text()) - self.charge_cutoff = float(self.charge_cutoff_input.text()) - self.c_rate = float(self.c_rate_input.text()) - - # Validate inputs - if self.capacity <= 0: - raise ValueError("Battery capacity must be positive") - if self.c_rate <= 0: - raise ValueError("C-rate must be positive") - - test_current = self.c_rate * self.capacity - if test_current > 0.2: - raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") - - # Clear ALL previous data completely - with self.plot_mutex: - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - - # Reset capacities and timing - self.start_time = time.time() - self.last_update_time = self.start_time - self.capacity_ah = 0.0 - self.energy = 0.0 - self.cycle_count = 1 - - # Reset measurement thread - if hasattr(self, 'measurement_thread'): - self.measurement_thread.start_time = time.time() - self.measurement_thread.voltage_window.clear() - self.measurement_thread.current_window.clear() - with self.measurement_thread.measurement_queue.mutex: - self.measurement_thread.measurement_queue.queue.clear() - - # Reset plot - self.reset_plot() - - # Start test - self.test_running = True - self.start_time = time.time() - self.last_update_time = time.time() - self.test_phase = "Charge" - self.phase_label.setText(self.test_phase) - - self.toggle_button.setChecked(True) - self.toggle_button.setText("STOP") - self.apply_button_style() - self.status_bar.showMessage(f"Charge started @ {test_current:.3f}A to {self.charge_cutoff}V") - - # Create log file - self.create_cycle_log_file() - - # Start charge worker in a QThread - self.charge_thread = QThread() - self.charge_worker = ChargeWorker( - self.active_device.dev, - test_current, - self.charge_cutoff, - self - ) - self.charge_worker.moveToThread(self.charge_thread) - - # Connect signals - self.charge_worker.update_status.connect(self.status_bar.showMessage) - self.charge_worker.test_completed.connect(self.finalize_test) - self.charge_worker.error_occurred.connect(self.handle_test_error) - self.charge_worker.finished.connect(self.charge_thread.quit) - self.charge_worker.finished.connect(self.charge_worker.deleteLater) - self.charge_thread.finished.connect(self.charge_thread.deleteLater) - - # Start the thread - self.charge_thread.start() - QTimer.singleShot(0, self.charge_worker.run) - - except Exception as e: - QMessageBox.critical(self, "Error", str(e)) - self.toggle_button.setChecked(False) - self.toggle_button.setText("START") - self.apply_button_style() - self.toggle_button.setEnabled(True) - - def start_live_monitoring(self, device): - """Initialize logging for a specific device (replaces create_device_log_file)""" - try: - if device.is_recording: - return True # Already recording - - # Ensure log directory exists - os.makedirs(device.log_dir, exist_ok=True) - if not os.access(device.log_dir, os.W_OK): - QMessageBox.critical(self, "Error", - f"No write permissions in {device.log_dir}") - return False - - # Generate filename with device serial and timestamp - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - clean_serial = device.serial.replace(":", "_")[-8:] # Clean for filename - filename = os.path.join( - device.log_dir, - f"battery_test_{clean_serial}_{timestamp}.csv" - ) - - # Open file and initialize writer - device.log_file = open(filename, 'w', newline='') - device.log_writer = csv.writer(device.log_file) - - # Write comprehensive header - device.log_file.write(f"# ADALM1000 Battery Test Log\n") - device.log_file.write(f"# Device Serial: {device.serial}\n") - device.log_file.write(f"# Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - device.log_file.write(f"# Test Mode: {self.current_mode}\n") - - if hasattr(self, 'capacity_input'): - device.log_file.write(f"# Battery Capacity: {self.capacity_input.text()} Ah\n") - - # Mode-specific parameters - if self.current_mode == "Cycle Test": - device.log_file.write(f"# Charge Cutoff: {self.charge_cutoff_input.text()} V\n") - device.log_file.write(f"# Discharge Cutoff: {self.discharge_cutoff_input.text()} V\n") - device.log_file.write(f"# Rest Time: {self.rest_time_input.text()} h\n") - - device.log_file.write("#\n") # End of header - - # Write column headers - if self.current_mode == "Cycle Test": - device.log_writer.writerow([ - "Time(s)", "Voltage(V)", "Current(A)", "Phase", - "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", - "Coulomb_Eff(%)", "Cycle" - ]) - else: - device.log_writer.writerow([ - "Time(s)", "Voltage(V)", "Current(A)", "Phase", - "Capacity(Ah)", "Power(W)", "Energy(Wh)" - ]) - - device.is_recording = True - device._last_log_time = 0 - return True - - except Exception as e: - error_msg = f"Failed to start recording for {device.serial}:\n{str(e)}" - print(error_msg) - QMessageBox.critical(self, "Recording Error", error_msg) - if device.log_file: - try: - device.log_file.close() - except: - pass - device.log_file = None - device.log_writer = None - return False - - def finalize_device_log_file(self, device): - """Properly finalize and close a device's log file""" - if not device.is_recording or not device.log_file: - return - - try: - # Write test summary footer - device.log_file.write("\n# TEST SUMMARY\n") - device.log_file.write(f"# End Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - - if device.time_data: - duration = device.time_data[-1] if device.time_data else 0 - device.log_file.write(f"# Duration: {self.format_time(duration)}\n") - device.log_file.write(f"# Final Voltage: {device.voltage_data[-1]:.4f} V\n") - device.log_file.write(f"# Final Current: {device.current_data[-1]:.4f} A\n") - - device.log_file.write(f"# Total Capacity: {device.capacity_ah:.6f} Ah\n") - device.log_file.write(f"# Total Energy: {device.energy:.6f} Wh\n") - - if self.current_mode == "Cycle Test": - device.log_file.write(f"# Cycles Completed: {device.cycle_count}\n") - if device.charge_capacity > 0: - device.log_file.write(f"# Coulomb Efficiency: {device.coulomb_efficiency:.2f}%\n") - - except Exception as e: - print(f"Error writing footer for {device.serial}: {str(e)}") - finally: - try: - device.log_file.close() - except: - pass - device.log_file = None - device.log_writer = None - device.is_recording = False + # Update connection status + if self.session_active: + self.status_light.setStyleSheet("background-color: green; border-radius: 8px;") + self.connection_label.setText(f"Connected ({len(self.devices)} devices)") + else: + self.status_light.setStyleSheet("background-color: red; border-radius: 8px;") + self.connection_label.setText("Disconnected") def format_time(self, seconds): """Convert seconds to hh:mm:ss format""" @@ -2331,637 +1105,37 @@ class BatteryTester(QMainWindow): seconds = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" - @safe_execute - def stop_test(self): - """Request immediate stop with proper visual feedback""" - # Immediate red button feedback - self.toggle_button.setStyleSheet(f""" - QPushButton {{ - background-color: {self.status_colors["warning"]}; - color: white; - font-weight: bold; - }} - """) - QApplication.processEvents() - - if not self.test_running: - self.reset_button_state() - return - - try: - # Stop operations - self.request_stop = True - self.test_running = False - self.measuring = False - - # Stop workers - workers = ['test_sequence_worker', 'discharge_worker', 'charge_worker'] - for worker in workers: - if hasattr(self, worker): - getattr(self, worker).stop() - - # Reset device - if self.active_device: - self.active_device.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.active_device.dev.channels['A'].constant(0) - self.handle_device_connection(True) # Confirm connection - - except Exception as e: - self.handle_device_connection(False, e) - finally: - # Clean up - self.reset_test_data() - self.reset_plot() - self.reset_button_state() - - if hasattr(self, 'current_cycle_file'): - self.finalize_log_file() - - self.status_bar.showMessage("Test stopped") - - def set_connection_status(self, text, color=None): - """Update connection status with optional color""" - if color is None: - if "error" in text.lower(): - color = self.status_colors["error"] - elif "disconnect" in text.lower(): - color = self.status_colors["disconnected"] - else: - color = self.status_colors["connected"] - - self.connection_label.setText(text) - self.status_light.setStyleSheet(f""" - background-color: {color}; - border-radius: 10px; - min-width: 12px; - max-width: 12px; - min-height: 12px; - max-height: 12px; - """) - QApplication.processEvents() - - def reset_button_state(self): - """Reset button to appropriate default state""" - mode = getattr(self, 'current_mode', 'Live Monitoring') - text = { - 'Cycle Test': "START CYCLE TEST", - 'Discharge Test': "START DISCHARGE", - 'Charge Test': "START CHARGE" - }.get(mode, "START") - - self.toggle_button.setChecked(False) - self.toggle_button.setText(text) - self.apply_button_style() - self.toggle_button.setStyleSheet(f""" - QPushButton {{ - background-color: {self.success_color}; - color: {self.fg_color}; - font-weight: bold; - }} - QPushButton:checked {{ - background-color: {self.warning_color}; - }} - QPushButton:disabled {{ - background-color: #4C566A; - }} - """) - - def reset_button_style(self): - """Reset button to default style""" - mode = getattr(self, 'current_mode', 'Live Monitoring') - text = { - 'Cycle Test': "START CYCLE TEST", - 'Discharge Test': "START DISCHARGE", - 'Charge Test': "START CHARGE", - }.get(mode, "START") - - self.toggle_button.setText(text) - self.toggle_button.setStyleSheet(f""" - QPushButton {{ - background-color: {self.success_color}; - color: {self.fg_color}; - font-weight: bold; - }} - QPushButton:checked {{ - background-color: {self.warning_color}; - }} - """) + def reset_plot(self): + """Reset the plot to initial state""" + self.line_voltage.set_data([], []) + self.line_current.set_data([], []) + self.ax.set_xlim(0, 10) + self.ax.set_ylim(0, 5.0) + self.ax2.set_ylim(-1.0, 1.0) + self.canvas.draw_idle() def closeEvent(self, event): - """Ensure clean shutdown""" - self.stop_test() + """Clean up on window close""" + # Stop all workers + for worker in self.workers.values(): + worker.stop() + + # Stop all threads + for thread in self.threads.values(): + thread.quit() + thread.wait() - # Stop all timers - self.status_timer.stop() - - # Close all log files - for device in self.devices.values(): - if device.is_recording: - self.finalize_device_log_file(device) - - # Clean up threads - if hasattr(self, 'measurement_thread') and self.measurement_thread: - self.measurement_thread.stop() - self.measurement_thread.wait(1000) + # End session + if self.session: + try: + self.session.end() + except: + pass event.accept() - def reset_test_data(self): - if not self.active_device: - return - - dev = self.active_device - # Clear all data buffers - dev.time_data.clear() - dev.voltage_data.clear() - dev.current_data.clear() - - # Display-Daten ebenfalls zurücksetzen - dev.display_time_data.clear() - dev.display_voltage_data.clear() - dev.display_current_data.clear() - - # Reset statistics - dev.capacity_ah = 0.0 - dev.energy = 0.0 - dev.charge_capacity = 0.0 - dev.coulomb_efficiency = 0.0 - dev.test_phase = "Idle" - - # Reset UI displays - self.voltage_label.setText("0.000") - self.current_label.setText("0.000") - #self.time_label.setText("00:00:00") - self.capacity_label.setText("0.0000") - self.energy_label.setText("0.0000") - self.phase_label.setText("Idle") - - def finalize_test(self): - """Final cleanup after test completes or is stopped""" - try: - # 1. Stop any active measurement or test operations - self.measuring = False - self.test_running = False - - # 2. Reset device to safe state - try: - self.active_device.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.active_device.dev.channels['A'].constant(0) - self.active_device.dev.channels['B'].mode = pysmu.Mode.HI_Z - self.active_device.dev.channels['B'].constant(0) - except Exception as e: - print(f"Error resetting device in finalize: {e}") - - # 3. Clean up test sequence thread safely - if hasattr(self, 'test_sequence_thread'): - try: - if self.test_sequence_thread.isRunning(): - if hasattr(self, 'test_sequence_worker'): - try: - self.test_sequence_worker.stop() - except RuntimeError: - pass - - self.test_sequence_thread.quit() - self.test_sequence_thread.wait(500) - except Exception as e: - print(f"Error stopping test sequence thread: {e}") - finally: - if hasattr(self, 'test_sequence_worker'): - try: - if not sip.isdeleted(self.test_sequence_worker): - self.test_sequence_worker.deleteLater() - except: - pass - - if hasattr(self, 'test_sequence_thread'): - try: - if not sip.isdeleted(self.test_sequence_thread): - self.test_sequence_thread.deleteLater() - except: - pass - finally: - if hasattr(self, 'test_sequence_thread'): - del self.test_sequence_thread - - # 4. Clean up discharge thread safely - if hasattr(self, 'discharge_thread'): - try: - if self.discharge_thread.isRunning(): - if hasattr(self, 'discharge_worker'): - try: - self.discharge_worker.stop() - except RuntimeError: - pass - - self.discharge_thread.quit() - self.discharge_thread.wait(500) - except Exception as e: - print(f"Error stopping discharge thread: {e}") - finally: - if hasattr(self, 'discharge_worker'): - try: - if not sip.isdeleted(self.discharge_worker): - self.discharge_worker.deleteLater() - except: - pass - - if hasattr(self, 'discharge_thread'): - try: - if not sip.isdeleted(self.discharge_thread): - self.discharge_thread.deleteLater() - except: - pass - finally: - if hasattr(self, 'discharge_thread'): - del self.discharge_thread - - # 5. Clean up charge thread safely (using same pattern as discharge thread) - if hasattr(self, 'charge_thread'): - try: - if self.charge_thread.isRunning(): - if hasattr(self, 'charge_worker'): - try: - self.charge_worker.stop() - except RuntimeError: - pass - - self.charge_thread.quit() - self.charge_thread.wait(500) - except Exception as e: - print(f"Error stopping charge thread: {e}") - finally: - if hasattr(self, 'charge_worker'): - try: - if not sip.isdeleted(self.charge_worker): - self.charge_worker.deleteLater() - except: - pass - - if hasattr(self, 'charge_thread'): - try: - if not sip.isdeleted(self.charge_thread): - self.charge_thread.deleteLater() - except: - pass - finally: - if hasattr(self, 'charge_thread'): - del self.charge_thread - - # 6. Finalize log file - self.finalize_log_file() - - # 7. Reset UI and state - self.request_stop = False - self.toggle_button.setChecked(False) - self.toggle_button.setText("START") - self.apply_button_style() - self.toggle_button.setEnabled(True) - self.apply_button_style() # Add this line - self.test_running = False - - # 8. Show completion message if test wasn't stopped by user - if not self.request_stop: - test_current = self.c_rate * self.capacity - test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A" - - if self.current_mode == "Cycle Test": - message = ( - f"Cycle test completed | " - f"Cycle {self.cycle_count} | " - f"Capacity: {self.capacity_ah:.4f}Ah | " - f"Efficiency: {self.coulomb_efficiency:.1f}%" - ) - - QMessageBox.information( - self, - "Test Completed", - f"Cycle test completed successfully.\n\n" - f"Test Parameters:\n" - f"- Capacity: {self.capacity} Ah\n" - f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n" - f"- Charge Cutoff: {self.charge_cutoff} V\n" - f"- Discharge Cutoff: {self.discharge_cutoff} V\n" - f"- Conditions: {test_conditions}\n\n" - f"Results:\n" - f"- Cycles: {self.cycle_count}\n" - f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n" - f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%" - ) - elif self.current_mode == "Discharge Test": - message = ( - f"Discharge completed | " - f"Capacity: {self.capacity_ah:.4f}Ah | " - f"Energy: {self.energy:.4f}Wh" - ) - - QMessageBox.information( - self, - "Discharge Completed", - f"Discharge test completed successfully.\n\n" - f"Test Parameters:\n" - f"- Capacity: {self.capacity} Ah\n" - f"- Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n" - f"- Discharge Cutoff: {self.discharge_cutoff} V\n" - f"- Conditions: {test_conditions}\n\n" - f"Results:\n" - f"- Discharge capacity: {self.capacity_ah:.4f}Ah\n" - f"- Energy delivered: {self.energy:.4f}Wh" - ) - - self.status_bar.showMessage(message) - - except Exception as e: - print(f"Error in finalize_test: {e}") - import traceback - traceback.print_exc() - # Ensure we don't leave the UI in a locked state - self.toggle_button.setChecked(False) - self.toggle_button.setText("START") - self.apply_button_style() - self.toggle_button.setEnabled(True) - self.status_bar.showMessage("Error during test finalization") - - def reset_plot(self): - """Completely reset the plot to initial state""" - # Clear plot data - self.line_voltage.set_data([], []) - self.line_current.set_data([], []) - - # Reset axes - self.ax.set_xlim(0, 10) - self.ax.set_ylim(0, 5.0) # Full voltage range - self.ax2.set_ylim(-0.25, 0.25) # Full current range - - # Redraw with slight delay to ensure UI updates - QTimer.singleShot(50, self.canvas.draw_idle) - - def update_status_and_plot(self): - """Combined status and plot update""" - self.update_status() - self.update_plot() - - def update_plot(self): - if not self.active_device: - return - dev = self.active_device - - # Get minimal data under lock - with self.plot_mutex: - if not dev.display_time_data: - return - # Copy only the last 1000 points - x_data = list(dev.display_time_data)[-1000:] - y1_data = list(dev.display_voltage_data)[-1000:] - y2_data = list(dev.display_current_data)[-1000:] - - # Update plot data outside lock - self.line_voltage.set_data(x_data, y1_data) - self.line_current.set_data(x_data, y2_data) - - # Auto-scale only when needed - if len(x_data) > 1 and x_data[-1] - x_data[0] > 1.0: - self.auto_scale_axes(x_data, y1_data, y2_data) - - self.canvas.draw_idle() - - def auto_scale_axes(self, x_data, y1_data, y2_data): - """Safe auto-scaling that handles empty data""" - if not x_data: - return - - try: - # Voltage scaling - voltage_padding = 0.2 - min_voltage = max(0, min(y1_data) - voltage_padding) - max_voltage = min(5.0, max(y1_data) + voltage_padding) - - # Current scaling - current_padding = 0.05 - min_current = max(-0.25, min(y2_data) - current_padding) - max_current = min(0.25, max(y2_data) + current_padding) - - # Apply new limits - self.ax.set_xlim(min(x_data), max(x_data) * 1.05) - self.ax.set_ylim(min_voltage, max_voltage) - self.ax2.set_ylim(min_current, max_current) - except ValueError: # Handle empty sequences - pass - - @pyqtSlot(str) - def handle_device_error(self, error_msg): - """Handle device errors with proper connection status""" - logger.error(f"Device error: {error_msg}") - self.set_connection_status(f"Error: {error_msg}", "orange") - self.reconnect_btn.setVisible(True) - self.reconnect_btn.setEnabled(True) - self.toggle_button.setEnabled(False) - - # Attempt automatic recovery - QTimer.singleShot(2000, self.reconnect_device) - - def validate_measurements(self, voltage, current): - """Filter out invalid measurements""" - # Fix negative values caused by connection issues - if voltage < 0 or not (0 <= voltage <= 5.0): - return False - if abs(current) > 0.3: # Beyond ADALM1000's ±200mA range - return False - return True - - @pyqtSlot(str) - def update_test_phase(self, phase_text): - """Update the test phase display""" - self.test_phase = phase_text - self.phase_label.setText(phase_text) - - @pyqtSlot(str) - def handle_test_error(self, error_msg): - """Handle errors from the test sequence with complete cleanup""" - try: - # 1. Notify user - QMessageBox.critical(self, "Test Error", - f"An error occurred:\n{error_msg}\n\nAttempting to recover...") - - # 2. Stop all operations - self.stop_test() - - # 3. Reset UI elements - if hasattr(self, 'line_voltage'): - try: - self.line_voltage.set_data([], []) - self.line_current.set_data([], []) - self.ax.set_xlim(0, 1) - self.ax2.set_xlim(0, 1) - self.canvas.draw() - except Exception as plot_error: - print(f"Plot reset error: {plot_error}") - - # 4. Update status - self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...") - self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;") - - # 5. Attempt recovery - QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect - - except Exception as e: - print(f"Error in error handler: {e}") - # Fallback - restart application? - QMessageBox.critical(self, "Fatal Error", - "The application needs to restart due to an unrecoverable error") - QTimer.singleShot(1000, self.close) - - def attempt_reconnect(self): - """Attempt to reconnect automatically""" - QMessageBox.critical( - self, - "Device Connection Error", - "Could not connect to ADALM1000\n\n" - "1. Check USB cable connection\n" - "2. The device will attempt to reconnect automatically" - ) - - QTimer.singleShot(1000, self.reconnect_device) - - def cleanup_test_threads(self): - """Clean up any existing test threads before starting a new test""" - # Stop and clean up test sequence thread if it exists - if hasattr(self, 'test_sequence_thread'): - try: - if hasattr(self, 'test_sequence_worker'): - self.test_sequence_worker.stop() - self.test_sequence_thread.quit() - self.test_sequence_thread.wait(500) - except Exception as e: - print(f"Error cleaning up test sequence thread: {e}") - finally: - if hasattr(self, 'test_sequence_worker'): - try: - self.test_sequence_worker.deleteLater() - except: - pass - if hasattr(self, 'test_sequence_thread'): - try: - self.test_sequence_thread.deleteLater() - except: - pass - - # Stop and clean up discharge thread if it exists - if hasattr(self, 'discharge_thread'): - try: - if hasattr(self, 'discharge_worker'): - self.discharge_worker.stop() - self.discharge_thread.quit() - self.discharge_thread.wait(500) - except Exception as e: - print(f"Error cleaning up discharge thread: {e}") - finally: - if hasattr(self, 'discharge_worker'): - try: - self.discharge_worker.deleteLater() - except: - pass - if hasattr(self, 'discharge_thread'): - try: - self.discharge_thread.deleteLater() - except: - pass - - # Stop and clean up charge thread if it exists - if hasattr(self, 'charge_thread'): - try: - if hasattr(self, 'charge_worker'): - self.charge_worker.stop() - self.charge_thread.quit() - self.charge_thread.wait(500) - except Exception as e: - print(f"Error cleaning up charge thread: {e}") - finally: - if hasattr(self, 'charge_worker'): - try: - self.charge_worker.deleteLater() - except: - pass - if hasattr(self, 'charge_thread'): - try: - self.charge_thread.deleteLater() - except: - pass - - def reconnect_device(self): - """Comprehensive device reconnection handler""" - try: - self.handle_device_connection(False, "Reconnecting...") - - # 1. Clean up existing connections - if hasattr(self, 'measurement_thread') and self.measurement_thread: - try: - self.measurement_thread.stop() - if not self.measurement_thread.wait(500): - self.measurement_thread.terminate() - except Exception as e: - print(f"Error stopping measurement thread: {e}") - - if hasattr(self, 'session') and self.session: - try: - self.session.end() - except Exception as e: - print(f"Error ending session: {e}") - - # 2. Initialize new session - self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) - self.session.scan() - - if not self.session.devices: - self.handle_device_connection(False, "No devices detected") - return - - self.session.start(0) - - # 3. Re-establish device connection - if hasattr(self, 'active_device') and self.active_device: - # Try to find the same device by serial - target_serial = self.active_device.serial - for dev in self.session.devices: - if dev.serial == target_serial: - # Recreate the DeviceManager - self.active_device = DeviceManager(dev) - self.devices[target_serial] = self.active_device - break - else: - # No previous device, just use first available - dev = self.session.devices[0] - self.active_device = DeviceManager(dev) - self.devices[dev.serial] = self.active_device - - # 4. Restart measurement system - self.active_device.start_measurement(self.interval) - - # Reconnect signals - self.measurement_thread = self.active_device.measurement_thread - self.measurement_thread.update_signal.connect(self.update_measurements) - self.measurement_thread.error_signal.connect(self.handle_device_error) - - # 5. Update UI - self.handle_device_connection(True, f"Reconnected: {self.active_device.serial}") - self.toggle_button.setEnabled(True) - - # Update device dropdown - self.device_combo.clear() - for serial in self.devices: - self.device_combo.addItem(serial) - self.device_combo.setCurrentText(self.active_device.serial) - - except Exception as e: - self.handle_device_connection(False, f"Reconnect failed: {str(e)}") - if __name__ == "__main__": app = QApplication([]) - try: - window = BatteryTester() - window.show() - app.exec_() - except Exception as e: - import traceback - traceback.print_exc() - QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}") \ No newline at end of file + window = BatteryTesterGUI() + window.show() + app.exec_() \ No newline at end of file