From 084d1be8fa72a003050e7cb6147ff0d11be7730b Mon Sep 17 00:00:00 2001 From: Jan Date: Fri, 8 Aug 2025 12:57:53 +0200 Subject: [PATCH] MainCode/adalm1000_logger.py aktualisiert fast! D --- MainCode/adalm1000_logger.py | 1071 +++++++++++++++------------------- 1 file changed, 462 insertions(+), 609 deletions(-) diff --git a/MainCode/adalm1000_logger.py b/MainCode/adalm1000_logger.py index 714ff36..02c0978 100644 --- a/MainCode/adalm1000_logger.py +++ b/MainCode/adalm1000_logger.py @@ -8,6 +8,9 @@ from datetime import datetime import numpy as np import matplotlib import subprocess +import logging +import sys +from usb.core import USBError matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure @@ -20,303 +23,203 @@ from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QH from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread from PyQt5.QtGui import QDoubleValidator from PyQt5 import sip +from PyQt5.QtCore import pyqtSignal import pysmu from pysmu import Session +import matplotlib as mpl +mpl.rcParams['font.family'] = 'sans-serif' +mpl.rcParams['font.sans-serif'] = ['DejaVu Sans', 'Arial', 'Liberation Sans', 'Verdana'] # Fallback fonts +mpl.rcParams['axes.edgecolor'] = '#D8DEE9' +mpl.rcParams['text.color'] = '#D8DEE9' +mpl.rcParams['axes.labelcolor'] = '#D8DEE9' +mpl.rcParams['xtick.color'] = '#D8DEE9' +mpl.rcParams['ytick.color'] = '#D8DEE9' + +logging.getLogger('matplotlib.font_manager').setLevel(logging.WARNING) +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler("adalm_debug.log", mode="a") + ] +) +logger = logging.getLogger("adalm") + +class DeviceDisconnectedError(Exception): + pass + class DeviceManager: - def __init__(self, dev): + def __init__(self, session=None, dev=None, serial=None): + self.session = session self.dev = dev - self.serial = dev.serial + self.serial = serial self.measurement_thread = None - self.is_running = False + self.lock = threading.RLock() # reentrant lock + self._last_open_attempt = 0 + logger.info(f"Created DeviceManager for {serial}") + mpl.font_manager._get_font.cache_clear() + self.measurement_thread = MeasurementThread(self, 0.1, self) + + # Initialize data buffers and statistics + self.time_data = deque(maxlen=10000) + self.voltage_data = deque(maxlen=10000) + self.current_data = deque(maxlen=10000) + self.display_time_data = deque(maxlen=1000) + self.display_voltage_data = deque(maxlen=1000) + self.display_current_data = deque(maxlen=1000) + self.capacity_ah = 0.0 + self.energy = 0.0 + self.charge_capacity = 0.0 + self.coulomb_efficiency = 0.0 + self.cycle_count = 0 + self.test_phase = "Idle" + + # Add these new attributes for recording state self.is_recording = False self.log_file = None self.log_writer = None self._last_log_time = 0 self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) - self.session = None - - # Datenpuffer - max_data_points = 36000 - self.time_data = deque(maxlen=max_data_points) - self.voltage_data = deque(maxlen=max_data_points) - self.current_data = deque(maxlen=max_data_points) - self.display_time_data = deque(maxlen=10000) - self.display_voltage_data = deque(maxlen=10000) - self.display_current_data = deque(maxlen=10000) - - # Testzustand - self.capacity_ah = 0.0 - self.energy = 0.0 - self.charge_capacity = 0.0 - self.coulomb_efficiency = 0.0 - self.cycle_count = 0 - self.test_phase = "Idle" - self.start_time = time.time() - self.plot_mutex = threading.Lock() - - # Logging - self.current_cycle_file = None - self.log_writer = None - - # Downsampling - self.downsample_factor = 1 - self.aggregation_buffer = { - 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0 - } - - self.consecutive_read_errors = 0 # Track read failures - self.max_consecutive_errors = 5 # Threshold before reset - - self.status_colors = { - "connected": "green", - "disconnected": "red", - "error": "orange", - } - - def handle_read_error(self, increment=1): - """Enhanced device recovery with proper USB resource handling""" - self.consecutive_read_errors += increment - - if self.consecutive_read_errors >= self.max_consecutive_errors: - try: - print(f"Attempting device recovery (errors: {self.consecutive_read_errors})...") - - # 1. First try soft reset - try: - if hasattr(self.dev, 'reset'): - print("Attempting soft reset...") - self.dev.reset() - time.sleep(1.5) # Increased delay for reset to complete - - # Verify reset worked - try: - samples = self.dev.read(1, 500, True) - if samples: - self.consecutive_read_errors = 0 - print("Soft reset successful") - return True - except Exception as e: - print(f"Verification after soft reset failed: {e}") - except Exception as e: - print(f"Soft reset failed: {e}") - - # 2. Full reinitialization with USB reset - print("Attempting full USB reset...") - try: - # Close existing session cleanly - if hasattr(self, 'session') and self.session: - try: - print("Ending existing session...") - self.session.end() - time.sleep(0.5) - except Exception as e: - print(f"Error ending session: {e}") - - # Add delay and attempt USB reset - time.sleep(2.0) # Longer delay for USB to settle - - # Try to release USB resources using libusb - try: - import usb.core - # Find all ADALM1000 devices - devices = usb.core.find(find_all=True, idVendor=0x064b, idProduct=0x784c) - for dev in devices: - try: - print(f"Releasing USB device: {dev}") - usb.util.dispose_resources(dev) - except Exception as e: - print(f"Error releasing USB device: {e}") - time.sleep(1.0) - except ImportError: - print("pyusb not available, skipping resource release") - - # Try multiple times to scan for devices - new_session = None - for attempt in range(3): - try: - print(f"Scan attempt {attempt + 1}/3...") - new_session = pysmu.Session(ignore_dataflow=True, queue_size=10000) - devices = new_session.scan() - - if not devices: - print(f"No devices found on attempt {attempt+1}/3") - time.sleep(1.0) - continue - - # Find our device by serial - new_dev = None - for d in devices: - if d.serial == self.serial: - new_dev = d - break - - if new_dev: - print(f"Found device {self.serial} on attempt {attempt+1}") - self.dev = new_dev - self.consecutive_read_errors = 0 - self.session = new_session # Update session reference - - # Restart measurement thread - if hasattr(self, 'measurement_thread'): - try: - self.measurement_thread.stop() - time.sleep(0.5) - except: - pass - self.start_measurement(self.interval) - - print("Device reinitialized successfully") - return True - else: - print(f"Original device not found on attempt {attempt+1}") - - except Exception as e: - print(f"Scan attempt {attempt+1} failed: {e}") - time.sleep(1.0) - - print("Failed to find original device after 3 attempts") - return False - - except Exception as e: - print(f"Full reinitialization failed: {e}") - # Continue to final fallback even if this fails - pass - except Exception as e: - print(f"Device recovery failed: {e}") - # Continue to final fallback - pass - - # 3. Final fallback - try USB reset command - try: - print("Attempting USB port reset...") - # ADALM1000 USB IDs - vendor_id = "064b" - product_id = "784c" - - try: - # Linux-specific reset - result = subprocess.run(['usbreset', f'{vendor_id}:{product_id}'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True) - - if result.returncode == 0: - print("USB reset command executed successfully, waiting 3 seconds...") - time.sleep(3.0) - return self.handle_read_error(0) # Retry with counter reset - else: - print(f"USB reset command failed with code {result.returncode}") - print(f"Error output: {result.stderr}") - - except FileNotFoundError: - # Windows alternative - try: - print("usbreset not found, trying Windows USB reset") - # Use devcon utility for Windows - result = subprocess.run(['devcon', 'restart', f'USB\VID_{vendor_id}&PID_{product_id}'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True) - - if result.returncode == 0: - print("USB device reset via devcon, waiting 3 seconds...") - time.sleep(3.0) - return self.handle_read_error(0) - else: - print(f"devcon reset failed with code {result.returncode}") - print(f"Error output: {result.stderr}") - - except FileNotFoundError: - # Python-based reset as final fallback - print("devcon not found, attempting Python USB reset") - try: - # Try to import USB module - try: - import usb.core - except ImportError: - print("pyusb not installed, please install with: pip install pyusb") - return False - - # Find device by vendor and product ID - dev = usb.core.find(idVendor=0x064b, idProduct=0x784c) - if dev is not None: - try: - # Reset the USB device - dev.reset() - print("USB device reset via pyusb, waiting 3 seconds...") - time.sleep(3.0) - return self.handle_read_error(0) - except usb.core.USBError as e: - print(f"USB reset error: {e}") - else: - print("USB device not found for reset") - except Exception as e: - print(f"Python USB reset failed: {e}") - - except Exception as e: - print(f"USB reset command failed: {e}") - import traceback - traceback.print_exc() - - return True # Not enough errors yet to trigger recovery - - def start_measurement(self, interval=0.1): - self.stop_measurement() # Ensure any existing thread is stopped - self.measurement_thread = MeasurementThread(self.dev, interval, self) - self.measurement_thread.start() - self.is_running = True - - def stop_measurement(self): - if self.measurement_thread: - try: - # Disconnect signals first - try: - self.measurement_thread.update_signal.disconnect() - self.measurement_thread.error_signal.disconnect() - except (RuntimeError, TypeError): - pass - - # Then stop the thread - self.measurement_thread.stop() - if not self.measurement_thread.wait(500): - self.measurement_thread.terminate() - self.measurement_thread = None - except Exception as e: - print(f"Error stopping measurement thread: {e}") - self.is_running = False def reset_data(self): + """Reset all data buffers and statistics for the device""" self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.display_time_data.clear() self.display_voltage_data.clear() self.display_current_data.clear() - self.aggregation_buffer = { - 'time': [], 'voltage': [], 'current': [], 'count': 0, 'last_plot_time': 0 - } self.capacity_ah = 0.0 self.energy = 0.0 self.charge_capacity = 0.0 self.coulomb_efficiency = 0.0 self.cycle_count = 0 - self.start_time = time.time() self.test_phase = "Idle" + # Reset recording state + self.is_recording = False + self._last_log_time = 0 -class DeviceDisconnectedError(Exception): - pass + def safe_call(self, fn, *args, **kwargs): + """Call fn under device lock and handle USB errors.""" + with self.lock: + try: + return fn(*args, **kwargs) + except (USBError, OSError, pysmu.exceptions.USBError) as e: + logger.exception(f"USB error in safe_call for {self.serial}") + raise DeviceDisconnectedError(f"USB error: {e}") from e + except Exception as e: + logger.exception(f"Error in safe_call for {self.serial}") + raise + + def set_simv_current(self, current): + """Set channel A to SIMV and apply current.""" + def _do(): + chA = self.dev.channels['A'] + if hasattr(chA, 'Mode'): + try: + chA.mode = chA.Mode.SIMV + except Exception: + logger.debug("Couldn't set chA.mode to SIMV") + if hasattr(chA, 'constant'): + chA.constant(abs(current)) + else: + try: + chA.set_current(abs(current)) + except Exception: + logger.debug("No channel.constant or set_current available") + logger.debug(f"Device {self.serial}: set_simv_current {current}") + return self.safe_call(_do) + + def hi_z(self): + def _do(): + chA = self.dev.channels['A'] + if hasattr(chA, 'Mode'): + try: + chA.mode = chA.Mode.HI_Z + except Exception: + logger.debug("Couldn't set chA.mode to HI_Z") + if hasattr(chA, 'constant'): + chA.constant(0) + logger.debug(f"Device {self.serial}: hi_z()") + return self.safe_call(_do) + + def close(self): + """Close session/device references safely.""" + with self.lock: + try: + if self.dev: + try: + if hasattr(self.dev, 'close'): + self.dev.close() + except Exception: + logger.exception(f"Error closing device {self.serial}") + self.dev = None + if self.session: + try: + if hasattr(self.session, 'close'): + self.session.close() + except Exception: + logger.exception(f"Error closing session for {self.serial}") + self.session = None + except Exception: + logger.exception(f"Error in DeviceManager.close for {self.serial}") + + def reopen_with_backoff(self, max_attempts=5, base_delay=1.0): + """Try to reopen the device with exponential backoff.""" + now = time.time() + if now - self._last_open_attempt < 0.2: + time.sleep(0.2) + self._last_open_attempt = now + + for attempt in range(1, max_attempts + 1): + try: + logger.info(f"Attempting to reopen device {self.serial} (attempt {attempt}/{max_attempts})") + from pysmu import Session + s = Session() + devs = s.list_devices() + chosen = None + for d in devs: + try: + if hasattr(d, 'serial') and d.serial == self.serial: + chosen = s.open(d.serial) + break + if isinstance(d, (list, tuple)) and str(d[0]) == str(self.serial): + chosen = s.open(str(d[0])) + break + except Exception: + continue + if chosen is None and devs: + logger.warning(f"Could not find serial {self.serial}; opening first device") + chosen = s.open(devs[0].serial if hasattr(devs[0], 'serial') else str(devs[0][0])) + if chosen: + with self.lock: + try: + if self.dev and hasattr(self.dev, 'close'): + self.dev.close() + except Exception: + logger.exception("Error closing previous dev") + self.session = s + self.dev = chosen + logger.info(f"Reopened device {self.serial} OK") + return True + except Exception as e: + logger.exception(f"Reopen attempt {attempt} failed for {self.serial}: {e}") + time.sleep(base_delay * attempt) + logger.error(f"Failed to reopen device {self.serial} after {max_attempts} attempts") + return False class MeasurementThread(QThread): - update_signal = pyqtSignal(float, float, float) + update_signal = pyqtSignal(str, float, float, float) error_signal = pyqtSignal(str) - def __init__(self, device, interval, parent_manager): + def __init__(self, device_manager, interval, parent_manager): super().__init__() - self.device = device + self.dev_manager = device_manager self.interval = interval - self._running = False + self._running = True self.filter_window_size = 10 self.voltage_window = [] self.current_window = [] @@ -328,401 +231,325 @@ class MeasurementThread(QThread): def stop(self): self._running = False try: - self.device.channels['A'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].constant(0) + self.dev_manager.hi_z() except Exception as e: - print(f"Error stopping device: {e}") - self.wait(500) # Wait for thread to finish - - def is_measuring(self): - return self.measurement_thread is not None and self.measurement_thread.isRunning() + logger.error(f"Error stopping device: {e}") + self.wait(500) def run(self): - """Measurement loop with enhanced recovery""" - self._running = True - self.start_time = time.time() - + if not self.dev_manager.dev: + self.error_signal.emit("Device not initialized") + return + logger.info(f"MeasurementThread for {self.parent_manager.serial} starting") while self._running: try: - samples = self.device.read(self.filter_window_size, 500, True) - - # --- Handle empty samples --- - if not samples: - self.parent_manager.consecutive_read_errors += 1 - if self.parent_manager.consecutive_read_errors >= self.parent_manager.max_consecutive_errors: - # Attempt device reset through parent manager - if hasattr(self, 'parent_manager'): - if not self.parent_manager.handle_read_error(): - raise DeviceDisconnectedError("Persistent read failures") - time.sleep(0.1) - continue - - # ✅ Reset error counter on successful read - self.parent_manager.consecutive_read_errors = 0 - - # --- Process samples --- - current_time = time.time() - self.start_time - - # Get voltage from Channel B (HI_Z mode) and current from Channel A - raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage - raw_current = np.mean([s[0][1] for s in samples]) * self.current_direction # Channel A current with direction - - # Update filter windows - self.voltage_window.append(raw_voltage) - self.current_window.append(raw_current) - - if len(self.voltage_window) > self.filter_window_size: - self.voltage_window.pop(0) - self.current_window.pop(0) - - voltage = np.mean(self.voltage_window) - current = np.mean(self.current_window) - - # Validate measurements - if not (-0.2 <= voltage <= 5.0): - raise ValueError(f"Voltage out of range: {voltage:.4f}V") - if not (-0.25 <= current <= 0.25): - raise ValueError(f"Current out of range: {current:.4f}A") - - # Emit update - self.update_signal.emit(voltage, current, current_time) - - # Store measurement + # Get measurement under lock + voltage, current, ts = self.dev_manager.safe_call(self._do_measure_once) + self.measurement_queue.put((self.parent_manager.serial, voltage, current, ts)) + self.update_signal.emit(self.parent_manager.serial, voltage, current, ts) + except DeviceDisconnectedError: + logger.error(f"Device {self.parent_manager.serial} disconnected during measurement") + self.error_signal.emit(f"Device {self.parent_manager.serial} disconnected") + # Attempt reconnect try: - self.measurement_queue.put_nowait((voltage, current)) - except Full: - pass - - time.sleep(max(0.05, self.interval)) - - except DeviceDisconnectedError as e: - self.error_signal.emit(f"Device disconnected: {str(e)}") - if not self.parent_manager.handle_read_error(): - break - time.sleep(1) - + logger.info("Attempting to reconnect...") + ok = self.dev_manager.reopen_with_backoff() + if ok: + logger.info(f"Reconnected to {self.parent_manager.serial}") + self.error_signal.emit(f"Reconnected to {self.parent_manager.serial}") + else: + logger.error(f"Permanent failure for {self.parent_manager.serial}") + self.error_signal.emit(f"Permanent failure for {self.parent_manager.serial}") + break + except Exception as e: + logger.exception(f"Reconnect failed: {e}") + time.sleep(2.0) except Exception as e: - self.error_signal.emit(f"Read error: {str(e)}") - if not self.parent_manager.handle_read_error(): - break - time.sleep(1) + logger.exception(f"Measurement error: {e}") + time.sleep(0.5) + time.sleep(self.interval) + logger.info(f"MeasurementThread for {self.parent_manager.serial} exiting") + + def _do_measure_once(self): + """Actual measurement code protected by DeviceManager lock""" + # This will be called inside DeviceManager.safe_call + samples = self.dev_manager.dev.get_samples(1) + if samples: + sample = samples[0] + voltage = sample[0][0] # Channel A voltage + current = sample[0][1] # Channel A current + return voltage, current, time.time() + return 0, 0, time.time() def set_direction(self, direction): - """Set current direction (1 for source, -1 for sink)""" self.current_direction = direction -class TestSequenceWorker(QObject): +class TestSequenceWorker(QThread): finished = pyqtSignal() update_phase = pyqtSignal(str) update_status = pyqtSignal(str) test_completed = pyqtSignal() error_occurred = pyqtSignal(str) - - def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent): - super().__init__() - self.device = device - self.test_current = test_current - self.charge_cutoff = charge_cutoff - self.discharge_cutoff = discharge_cutoff - self.rest_time = rest_time * 3600 # Convert hours to seconds - self.continuous_mode = continuous_mode + + def __init__(self, device_manager, test_current, charge_cutoff, discharge_cutoff, + rest_time_hours, continuous_mode, parent=None): + super().__init__(parent) + self.dev_manager = device_manager # DeviceManager instance + self.device = device_manager.dev # raw pysmu device for direct calls + self.test_current = float(test_current) + self.charge_cutoff = float(charge_cutoff) + self.discharge_cutoff = float(discharge_cutoff) + self.rest_time = float(rest_time_hours) * 3600.0 + self.continuous_mode = bool(continuous_mode) self.parent = parent self._running = True - self.voltage_timeout = 0.5 # seconds + self.voltage_timeout = 0.5 + + def stop(self): + self._running = False + + def _set_direction_on_measurement_thread(self, direction): + try: + mt = getattr(self.dev_manager, 'measurement_thread', None) + if mt and hasattr(mt, 'set_direction'): + mt.set_direction(direction) + except Exception: + pass + + def _set_simv_current(self, current): + try: + self.dev_manager.set_simv_current(current) + except DeviceDisconnectedError: + logger.error("Device disconnected during current set") + raise + + def _hi_z(self): + try: + self.dev_manager.hi_z() + except DeviceDisconnectedError: + logger.error("Device disconnected during hi-z") + raise def get_latest_measurement(self): - """Thread-safe measurement reading with timeout""" try: - return self.parent.measurement_thread.measurement_queue.get( - timeout=self.voltage_timeout - ) + q = self.dev_manager.measurement_thread.measurement_queue + item = q.get(timeout=self.voltage_timeout) + # support multiple formats + if isinstance(item, (tuple, list)): + if len(item) >= 3 and isinstance(item[0], str): + _, v, i, *rest = item + ts = rest[0] if rest else None + return v, i, ts + elif len(item) >= 2: + v = item[0]; i = item[1]; ts = item[2] if len(item) >= 3 else None + return v, i, ts + return None, None, None except Empty: - return (None, None) # Return tuple for unpacking + return None, None, None + except Exception: + return None, None, None def charge_phase(self): - """Handle the battery charging phase""" self.update_phase.emit("Charge") - self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.4f}A") - - try: - # Configure channels - Channel A sources current, Channel B measures voltage - self.device.channels['B'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].mode = pysmu.Mode.SIMV - self.device.channels['A'].constant(self.test_current) - self.parent.measurement_thread.set_direction(1) # Source current - - # Small delay to allow current to stabilize + self._set_direction_on_measurement_thread(1) + self._set_simv_current(abs(self.test_current)) + while self._running: + v, i, ts = self.get_latest_measurement() + if v is None: + time.sleep(0.05); continue + if v >= self.charge_cutoff: + break time.sleep(0.1) - - while self._running: - voltage, current = self.get_latest_measurement() - if voltage is None: - continue - - # Update parent's data for logging/display - with self.parent.plot_mutex: - if len(self.parent.voltage_data) > 0: - self.parent.voltage_data[-1] = voltage - self.parent.current_data[-1] = current - - if voltage >= self.charge_cutoff: - break - - time.sleep(0.1) - - finally: - self.device.channels['A'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].constant(0) + self._hi_z() def discharge_phase(self): - """Handle the battery discharging phase""" - voltage, _ = self.get_latest_measurement() - if voltage is not None and voltage <= self.discharge_cutoff: - self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)") - return self.update_phase.emit("Discharge") - self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A") - - try: - # Configure channels - Channel A sinks current, Channel B measures voltage - self.device.channels['B'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].mode = pysmu.Mode.SIMV - self.device.channels['A'].constant(-self.test_current) - self.parent.measurement_thread.set_direction(-1) # Sink current - - # Small delay to allow current to stabilize + self._set_direction_on_measurement_thread(-1) + self._set_simv_current(-abs(self.test_current)) + while self._running: + v, i, ts = self.get_latest_measurement() + if v is None: + time.sleep(0.05); continue + if v <= self.discharge_cutoff: + break time.sleep(0.1) - - while self._running: - voltage, current = self.get_latest_measurement() - if voltage is None: - continue - - # Update parent's data for logging/display - with self.parent.plot_mutex: - if len(self.parent.voltage_data) > 0: - self.parent.voltage_data[-1] = voltage - self.parent.current_data[-1] = current - - if voltage <= self.discharge_cutoff: - break - - time.sleep(0.1) - - finally: - self.device.channels['A'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].constant(0) - + self._hi_z() + def rest_phase(self, phase_name): - """Handle rest period between phases""" - self.update_phase.emit(f"Resting ({phase_name})") + self.update_phase.emit(f"Rest: {phase_name}") rest_end = time.time() + self.rest_time - while time.time() < rest_end and self._running: - time_left = max(0, rest_end - time.time()) - self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min") - time.sleep(1) - - def stop(self): - self._running = False - try: - self.device.channels['A'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].constant(0) - except Exception as e: - print(f"Error stopping device: {e}") + time.sleep(0.5) + self.update_status.emit(f"Resting | {phase_name} | {max(0, rest_end-time.time()):.0f}s left") def run(self): - """Main test sequence loop""" try: - first_cycle = True # Ensure at least one cycle runs - - while (self._running and - (self.parent.continuous_mode_check.isChecked() or first_cycle)): - self.parent.request_stop = False - self.parent.cycle_count += 1 - first_cycle = False # Only True for the first cycle - - # 1. Charge phase (constant current) - self.charge_phase() - if not self._running or self.parent.request_stop: - break - - # 2. Rest period after charge - self.rest_phase("Post-Charge") - if not self._running or self.parent.request_stop: - break - - # 3. Discharge phase (capacity measurement) + while self._running: self.discharge_phase() - if not self._running or self.parent.request_stop: + if not self._running: break + if self.rest_time > 0: self.rest_phase("Post-Discharge") + if not self._running: break + self.charge_phase() + if not self._running: break + if self.rest_time > 0: self.rest_phase("Post-Charge") + if not self.continuous_mode: break - - # 4. Rest period after discharge (only if not stopping) - if self._running and not self.parent.request_stop: - self.rest_phase("Post-Discharge") - - # Calculate Coulomb efficiency if not stopping - if not self.parent.request_stop and self.parent.charge_capacity > 0: - self.parent.coulomb_efficiency = ( - self.parent.capacity_ah / self.parent.charge_capacity - ) * 100 - - # Test completed self.test_completed.emit() - except Exception as e: - self.error_occurred.emit(f"Test sequence error: {str(e)}") + self.error_occurred.emit(str(e)) finally: + try: self._hi_z() + except: pass self.finished.emit() -class DischargeWorker(QObject): +class DischargeWorker(QThread): finished = pyqtSignal() update_status = pyqtSignal(str) test_completed = pyqtSignal() error_occurred = pyqtSignal(str) - - def __init__(self, device, test_current, discharge_cutoff, parent): - super().__init__() - self.device = device - self.test_current = test_current - self.discharge_cutoff = discharge_cutoff - self.parent = parent + + def __init__(self, device_manager, discharge_current, cutoff_voltage, parent=None): + super().__init__(parent) + self.dev_manager = device_manager + self.device = device_manager.dev + self.discharge_current = float(discharge_current) + self.cutoff_voltage = float(cutoff_voltage) self._running = True - self.voltage_timeout = 0.5 # seconds + self.voltage_timeout = 0.5 + + def stop(self): + self._running = False + + def _set_direction_on_measurement_thread(self, direction): + try: + mt = getattr(self.dev_manager, 'measurement_thread', None) + if mt and hasattr(mt, 'set_direction'): + mt.set_direction(direction) + except Exception: + pass + + def _set_simv_current(self, current): + try: + self.dev_manager.set_simv_current(current) + except DeviceDisconnectedError: + logger.error("Device disconnected during current set") + raise + + def _hi_z(self): + try: + self.dev_manager.hi_z() + except DeviceDisconnectedError: + logger.error("Device disconnected during hi-z") + raise def get_latest_measurement(self): - """Thread-safe measurement reading with timeout""" try: - return self.parent.measurement_thread.measurement_queue.get( - timeout=self.voltage_timeout - ) + q = self.dev_manager.measurement_thread.measurement_queue + item = q.get(timeout=self.voltage_timeout) + if isinstance(item, (tuple, list)): + if len(item) >= 3 and isinstance(item[0], str): + _, v, i, *rest = item; ts = rest[0] if rest else None; return v, i, ts + elif len(item) >= 2: + v = item[0]; i = item[1]; ts = item[2] if len(item)>=3 else None; return v, i, ts + return None, None, None except Empty: - return (None, None) # Return tuple for unpacking - - def discharge_phase(self): - """Handle the battery discharging phase""" - voltage, _ = self.get_latest_measurement() - if voltage is not None and voltage <= self.discharge_cutoff: - self.update_status.emit(f"Already below discharge cutoff ({voltage:.4f}V ≤ {self.discharge_cutoff}V)") - return - self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.4f}A") - - try: - # Configure channels - Channel A sinks current, Channel B measures voltage - self.device.channels['B'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].mode = pysmu.Mode.SIMV - self.device.channels['A'].constant(-self.test_current) - self.parent.measurement_thread.set_direction(-1) # Sink current - - # Small delay to allow current to stabilize - time.sleep(0.1) - - while self._running: - voltage, current = self.get_latest_measurement() - if voltage is None: - continue - - # Update parent's data for logging/display - if self.parent.active_device: - with self.parent.active_device.plot_mutex: - if self.parent.active_device.voltage_data: - self.parent.active_device.voltage_data[-1] = voltage - self.parent.active_device.current_data[-1] = current - - if voltage <= self.discharge_cutoff: - break - - time.sleep(0.1) - - finally: - self.device.channels['A'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].constant(0) - - def stop(self): - self._running = False - try: - self.device.channels['A'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].constant(0) - except Exception as e: - print(f"Error stopping device: {e}") + return None, None, None + except Exception: + return None, None, None def run(self): - """Main discharge sequence""" try: - self.parent.request_stop = False - self.parent.cycle_count = 1 # Only one discharge cycle - - # Discharge phase - self.discharge_phase() - - if not self._running or self.parent.request_stop: - return - - # Test completed + self._set_direction_on_measurement_thread(-1) + self._set_simv_current(-abs(self.discharge_current)) + while self._running: + v, i, ts = self.get_latest_measurement() + if v is None: + time.sleep(0.05); continue + if v <= self.cutoff_voltage: + break + time.sleep(0.1) self.test_completed.emit() - except Exception as e: - self.error_occurred.emit(f"Discharge error: {str(e)}") + self.error_occurred.emit(str(e)) finally: + try: self._hi_z() + except: pass self.finished.emit() -class ChargeWorker(QObject): +class ChargeWorker(QThread): finished = pyqtSignal() update_status = pyqtSignal(str) test_completed = pyqtSignal() error_occurred = pyqtSignal(str) - - def __init__(self, device, test_current, charge_cutoff, parent): - super().__init__() - self.device = device - self.test_current = test_current - self.charge_cutoff = charge_cutoff - self.parent = parent - self._running = True - def run(self): - """Main charge sequence""" - try: - self.parent.measurement_thread.set_direction(1) # Source current - - # Configure channels - Channel A sources current, Channel B measures voltage - self.device.channels['B'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].mode = pysmu.Mode.SIMV - self.device.channels['A'].constant(self.test_current) - time.sleep(0.1) # Allow current to stabilize - - while self._running: - voltage, current = self.parent.get_latest_measurement() - if voltage is None: - continue - - # Update parent's data for logging/display - if self.parent.active_device: - with self.parent.active_device.plot_mutex: - if self.parent.active_device.voltage_data: - self.parent.active_device.voltage_data[-1] = voltage - self.parent.active_device.current_data[-1] = current - - if voltage >= self.charge_cutoff: - break - - time.sleep(0.1) - - self.test_completed.emit() - except Exception as e: - self.error_occurred.emit(f"Charge error: {str(e)}") - finally: - self.device.channels['A'].constant(0) - self.finished.emit() + def __init__(self, device_manager, charge_current, cutoff_voltage, parent=None): + super().__init__(parent) + self.dev_manager = device_manager + self.device = device_manager.dev + self.charge_current = float(charge_current) + self.cutoff_voltage = float(cutoff_voltage) + self._running = True + self.voltage_timeout = 0.5 def stop(self): self._running = False - try: - self.device.channels['A'].mode = pysmu.Mode.HI_Z - self.device.channels['A'].constant(0) - except Exception as e: - print(f"Error stopping device: {e}") + def _set_direction_on_measurement_thread(self, direction): + try: + mt = getattr(self.dev_manager, 'measurement_thread', None) + if mt and hasattr(mt, 'set_direction'): + mt.set_direction(direction) + except Exception: + pass + + def _set_simv_current(self, current): + try: + self.dev_manager.set_simv_current(current) + except DeviceDisconnectedError: + logger.error("Device disconnected during current set") + raise + + def _hi_z(self): + try: + self.dev_manager.hi_z() + except DeviceDisconnectedError: + logger.error("Device disconnected during hi-z") + raise + + def get_latest_measurement(self): + try: + q = self.dev_manager.measurement_thread.measurement_queue + item = q.get(timeout=self.voltage_timeout) + if isinstance(item, (tuple, list)): + if len(item) >= 3 and isinstance(item[0], str): + _, v, i, *rest = item; ts = rest[0] if rest else None; return v, i, ts + elif len(item) >= 2: + v = item[0]; i = item[1]; ts = item[2] if len(item)>=3 else None; return v, i, ts + return None, None, None + except Empty: + return None, None, None + except Exception: + return None, None, None + + def run(self): + try: + self._set_direction_on_measurement_thread(1) + self._set_simv_current(abs(self.charge_current)) + while self._running: + v, i, ts = self.get_latest_measurement() + if v is None: + time.sleep(0.05); continue + if v >= self.cutoff_voltage: + break + time.sleep(0.1) + self.test_completed.emit() + except Exception as e: + self.error_occurred.emit(str(e)) + finally: + try: self._hi_z() + except: pass + self.finished.emit() + class BatteryTester(QMainWindow): def __init__(self): self.plot_mutex = threading.Lock() @@ -1320,7 +1147,7 @@ class BatteryTester(QMainWindow): self.apply_button_style() self.status_bar.showMessage(f"Mode changed to {mode_name}") - + def reset_test(self): if not self.active_device: return @@ -1328,7 +1155,7 @@ class BatteryTester(QMainWindow): dev_manager = self.active_device dev_manager.reset_data() # Reset in DeviceManager - # UI zurücksetzen + # Reset UI self.capacity_label.setText("0.0000") self.energy_label.setText("0.0000") self.cycle_label.setText("0") @@ -1413,8 +1240,8 @@ class BatteryTester(QMainWindow): self.ax.grid(True, color='#4C566A') # Position legends - self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) - self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) + self.ax.legend(loc='upper left') + self.ax2.legend(loc='upper right') # Embed plot self.canvas = FigureCanvas(self.fig) @@ -1452,9 +1279,10 @@ class BatteryTester(QMainWindow): self.active_device = None for dev in self.session.devices: - manager = DeviceManager(dev) - manager.start_measurement(interval=self.interval) - self.devices[dev.serial] = manager + serial = dev.serial if hasattr(dev, 'serial') else str(dev[0]) + logger.info(f"Found device: {serial}") + manager = DeviceManager(dev=dev, serial=serial) + self.devices[serial] = manager # Connect signals for all devices manager.measurement_thread.update_signal.connect(self.update_measurements) @@ -1612,17 +1440,38 @@ class BatteryTester(QMainWindow): # Store current device state before switching old_device = self.active_device + # Disconnect signals from old device's measurement thread + if old_device and old_device.measurement_thread: + try: + # Safely disconnect signals + old_device.measurement_thread.update_signal.disconnect(self.update_measurements) + old_device.measurement_thread.error_signal.disconnect(self.handle_device_error) + except TypeError: + # Signals weren't connected - safe to ignore + pass + # Activate new device self.active_device = self.devices[serial] - # Reconnect signals for new device - self.measurement_thread = self.active_device.measurement_thread - self.measurement_thread.update_signal.connect(self.update_measurements) - self.measurement_thread.error_signal.connect(self.handle_device_error) + # Ensure measurement thread exists + if not hasattr(self.active_device, 'measurement_thread'): + # Create measurement thread if missing + self.active_device.measurement_thread = MeasurementThread( + self.active_device, + self.interval, + self.active_device + ) - # Restart measurement if not running - if not self.measurement_thread.isRunning(): - self.active_device.start_measurement(self.interval) + # Get reference to new thread + new_thread = self.active_device.measurement_thread + + # Connect signals for new device + new_thread.update_signal.connect(self.update_measurements) + new_thread.error_signal.connect(self.handle_device_error) + + # Start measurement if not running + if not new_thread.isRunning(): + new_thread.start() # Update UI with current device data self.update_ui_from_active_device() @@ -1631,7 +1480,7 @@ class BatteryTester(QMainWindow): if old_device and old_device.is_recording: # Ensure old device continues recording if not old_device.measurement_thread.isRunning(): - old_device.start_measurement(self.interval) + old_device.measurement_thread.start() # Update recording button for new device self.record_button.setChecked(self.active_device.is_recording) @@ -1908,7 +1757,7 @@ class BatteryTester(QMainWindow): # Start test sequence self.test_sequence_thread = QThread() self.test_sequence_worker = TestSequenceWorker( - dev, + self.active_device, test_current, self.charge_cutoff, self.discharge_cutoff, @@ -1940,7 +1789,7 @@ class BatteryTester(QMainWindow): self.discharge_cutoff = float(self.discharge_cutoff_input.text()) self.discharge_thread = QThread() - self.discharge_worker = DischargeWorker(dev, test_current, self.discharge_cutoff, self) + self.discharge_worker = DischargeWorker(self.active_device, test_current, self.discharge_cutoff, self) self.discharge_worker.moveToThread(self.discharge_thread) self.discharge_worker.update_status.connect(self.status_bar.showMessage) @@ -1964,7 +1813,7 @@ class BatteryTester(QMainWindow): self.charge_cutoff = float(self.charge_cutoff_input.text()) self.charge_thread = QThread() - self.charge_worker = ChargeWorker(dev, test_current, self.charge_cutoff, self) + self.charge_worker = ChargeWorker(self.active_device, test_current, self.charge_cutoff, self) self.charge_worker.moveToThread(self.charge_thread) self.charge_worker.update_status.connect(self.status_bar.showMessage) @@ -2874,10 +2723,14 @@ class BatteryTester(QMainWindow): @pyqtSlot(str) def handle_device_error(self, error_msg): """Handle device errors with proper connection status""" - self.handle_device_connection(False, f"Error: {error_msg}") + logger.error(f"Device error: {error_msg}") + self.set_connection_status(f"Error: {error_msg}", "orange") self.reconnect_btn.setVisible(True) self.reconnect_btn.setEnabled(True) self.toggle_button.setEnabled(False) + + # Attempt automatic recovery + QTimer.singleShot(2000, self.reconnect_device) def validate_measurements(self, voltage, current): """Filter out invalid measurements"""