MainCode/adalm1000_logger.py aktualisiert

# -*- coding: utf-8 -*-
import os
import time
import csv
import threading
from datetime import datetime
import numpy as np
import matplotlib
matplotlib.use('Qt5Agg')
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
from collections import deque
from queue import Queue, Full, Empty

from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, 
                             QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog)
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread
import pysmu

class DeviceDisconnectedError(Exception):
    pass

class MeasurementThread(QThread):
    update_signal = pyqtSignal(float, float, float)
    error_signal = pyqtSignal(str)

    def __init__(self, device, interval=0.1):
        super().__init__()
        self.device = device
        self.interval = interval
        self._running = False
        self.filter_window_size = 10
        self.voltage_window = []
        self.current_window = []
        self.start_time = time.time()
        self.measurement_queue = Queue(maxsize=1)

    def run(self):
        self._running = True
        while self._running:
            try:
                samples = self.device.read(self.filter_window_size, 500, True)
                if not samples:
                    raise DeviceDisconnectedError("No samples received")
                
                current_time = time.time() - self.start_time
                
                # Get voltage from Channel B (HI_Z mode) and current from Channel A
                raw_voltage = np.mean([s[1][0] for s in samples])  # Channel B voltage
                raw_current = np.mean([s[0][1] for s in samples])  # Channel A current
                
                # Update filter windows
                self.voltage_window.append(raw_voltage)
                self.current_window.append(raw_current)
                
                if len(self.voltage_window) > self.filter_window_size:
                    self.voltage_window.pop(0)
                    self.current_window.pop(0)
                
                voltage = np.mean(self.voltage_window)
                current = np.mean(self.current_window)
                
                # Emit update
                self.update_signal.emit(voltage, current, current_time)
                
                # Store measurement
                try:
                    self.measurement_queue.put_nowait((voltage, current))
                except Full:
                    pass
                
                time.sleep(max(0.05, self.interval))
                
            except Exception as e:
                self.error_signal.emit(f"Read error: {str(e)}")
                time.sleep(1)
                continue

    def stop(self):
        self._running = False
        self.wait(500)

class TestSequenceWorker(QObject):
    finished = pyqtSignal()
    update_phase = pyqtSignal(str)
    update_status = pyqtSignal(str)
    test_completed = pyqtSignal()
    error_occurred = pyqtSignal(str)
    
    def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent):
        super().__init__()
        self.device = device
        self.test_current = test_current
        self.charge_cutoff = charge_cutoff
        self.discharge_cutoff = discharge_cutoff
        self.rest_time = rest_time * 3600  # Convert hours to seconds
        self.continuous_mode = continuous_mode
        self.parent = parent
        self._running = True
        self.voltage_timeout = 0.5  # seconds

    def get_latest_measurement(self):
        """Thread-safe measurement reading with timeout"""
        try:
            return self.parent.measurement_thread.measurement_queue.get(
                timeout=self.voltage_timeout
            )
        except Empty:
            return (None, None)  # Return tuple for unpacking

    def charge_phase(self):
        """Handle the battery charging phase"""
        self.update_phase.emit("Charge")
        self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.3f}A")
        
        try:
            # Configure channels - Channel A sources current, Channel B measures voltage
            self.device.channels['B'].mode = pysmu.Mode.HI_Z
            self.device.channels['A'].mode = pysmu.Mode.SIMV
            self.device.channels['A'].constant(self.test_current)
            
            # Small delay to allow current to stabilize
            time.sleep(0.1)
            
            while self._running:
                voltage, current = self.get_latest_measurement()
                if voltage is None:
                    continue
                    
                # Update parent's data for logging/display
                with self.parent.plot_mutex:
                    if len(self.parent.voltage_data) > 0:
                        self.parent.voltage_data[-1] = voltage
                        self.parent.current_data[-1] = current
                
                if voltage >= self.charge_cutoff:
                    break
                    
                time.sleep(0.1)
                
        finally:
            self.device.channels['A'].mode = pysmu.Mode.HI_Z
            self.device.channels['A'].constant(0)

    def discharge_phase(self):
        """Handle the battery discharging phase"""
        self.update_phase.emit("Discharge")
        self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.3f}A")
        
        try:
            # Configure channels - Channel A sinks current, Channel B measures voltage
            self.device.channels['B'].mode = pysmu.Mode.HI_Z
            self.device.channels['A'].mode = pysmu.Mode.SIMV
            self.device.channels['A'].constant(-self.test_current)
            
            # Small delay to allow current to stabilize
            time.sleep(0.1)
            
            while self._running:
                voltage, current = self.get_latest_measurement()
                if voltage is None:
                    continue
                    
                # Update parent's data for logging/display
                with self.parent.plot_mutex:
                    if len(self.parent.voltage_data) > 0:
                        self.parent.voltage_data[-1] = voltage
                        self.parent.current_data[-1] = current
                
                if voltage <= self.discharge_cutoff:
                    break
                    
                time.sleep(0.1)
                
        finally:
            self.device.channels['A'].mode = pysmu.Mode.HI_Z
            self.device.channels['A'].constant(0)
        
    def rest_phase(self, phase_name):
        """Handle rest period between phases"""
        self.update_phase.emit(f"Resting ({phase_name})")
        rest_end = time.time() + self.rest_time
        
        while time.time() < rest_end and self._running:
            time_left = max(0, rest_end - time.time())
            self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min")
            time.sleep(1)
            
    def stop(self):
        """Request the thread to stop"""
        self._running = False
        self.device.channels['A'].mode = pysmu.Mode.HI_Z
        self.device.channels['A'].constant(0)

    def run(self):
        """Main test sequence loop"""
        try:
            while self._running and (self.continuous_mode or self.parent.cycle_count == 0):
                # Reset stop request at start of each cycle
                self.parent.request_stop = False
                self.parent.cycle_count += 1

                # 1. Charge phase (constant current)
                self.charge_phase()
                if not self._running or self.parent.request_stop:
                    break
                    
                # 2. Rest period after charge
                self.rest_phase("Post-Charge")
                if not self._running or self.parent.request_stop:
                    break
                    
                # 3. Discharge phase (capacity measurement)
                self.discharge_phase()
                if not self._running or self.parent.request_stop:
                    break
                    
                # 4. Rest period after discharge (only if not stopping)
                if self._running and not self.parent.request_stop:
                    self.rest_phase("Post-Discharge")
                    
                # Calculate Coulomb efficiency if not stopping
                if not self.parent.request_stop and self.parent.charge_capacity > 0:
                    self.parent.coulomb_efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100
                    
            # Test completed
            self.test_completed.emit()
            
        except Exception as e:
            self.error_occurred.emit(f"Test sequence error: {str(e)}")
        finally:
            self.finished.emit()

class BatteryTester(QMainWindow):
    def __init__(self):
        self.plot_mutex = threading.Lock()
        super().__init__()

        # Color scheme
        self.bg_color = "#2E3440"
        self.fg_color = "#D8DEE9"
        self.accent_color = "#5E81AC"
        self.warning_color = "#BF616A"
        self.success_color = "#A3BE8C"
        
        # Device and measurement state
        self.session_active = False
        self.measuring = False
        self.test_running = False
        self.continuous_mode = False  
        self.request_stop = False  
        self.interval = 0.1  
        self.log_dir = os.path.expanduser("~/adalm1000/logs")
        os.makedirs(self.log_dir, exist_ok=True)
        
        # Data buffers
        self.time_data = deque()
        self.voltage_data = deque()
        self.current_data = deque()
        self.phase_data = deque()
        
        # Initialize UI and device
        self.setup_ui()
        self.init_device()
        
        # Set window properties
        self.setWindowTitle("ADALM1000 - Battery Capacity Tester (CC Test)")
        self.resize(1000, 800)
        self.setMinimumSize(800, 700)
        
        # Status update timer
        self.status_timer = QTimer()
        self.status_timer.timeout.connect(self.update_status)
        self.status_timer.start(1000)  # Update every second

    def setup_ui(self):
        """Configure the user interface"""
        # Main widget and layout
        self.central_widget = QWidget()
        self.setCentralWidget(self.central_widget)
        self.main_layout = QVBoxLayout(self.central_widget)
        self.main_layout.setContentsMargins(10, 10, 10, 10)
        
        # Header area
        header_frame = QFrame()
        header_frame.setFrameShape(QFrame.NoFrame)
        header_layout = QHBoxLayout(header_frame)
        header_layout.setContentsMargins(0, 0, 0, 0)
        
        self.title_label = QLabel("ADALM1000 Battery Capacity Tester (CC Test)")
        self.title_label.setStyleSheet(f"font-size: 14pt; font-weight: bold; color: {self.accent_color};")
        header_layout.addWidget(self.title_label, 1)
        
        # Status indicator
        self.status_light = QLabel()
        self.status_light.setFixedSize(20, 20)
        self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
        header_layout.addWidget(self.status_light)
        
        self.connection_label = QLabel("Disconnected")
        header_layout.addWidget(self.connection_label)
        
        # Reconnect button
        self.reconnect_btn = QPushButton("Reconnect")
        self.reconnect_btn.clicked.connect(self.reconnect_device)
        header_layout.addWidget(self.reconnect_btn)
        
        self.main_layout.addWidget(header_frame)
        
        # Measurement display
        display_frame = QFrame()
        display_frame.setFrameShape(QFrame.StyledPanel)
        display_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
        display_layout = QGridLayout(display_frame)
        
        # Measurement values
        measurement_labels = [
            ("Voltage", "V"),       ("Current", "A"),       ("Test Phase", ""),
            ("Elapsed Time", "s"),      ("Discharge Capacity", "Ah"), ("Charge Capacity", "Ah"),
            ("Coulomb Eff.", "%"),      ("Cycle Count", ""),        ("Battery Temp", "°C"),
            ("Internal R", "Ω"),        ("Power", "W"),             ("Energy", "Wh")
        ]

        # 4 Zeilen × 3 Spalten Anordnung
        for i, (label, unit) in enumerate(measurement_labels):
            row = i // 3  # 0-3 (4 Zeilen)
            col = (i % 3) * 3  # 0, 3, 6 (3 Spalten mit je 3 Widgets)
            
            # Label für den Messwertnamen
            lbl = QLabel(f"{label}:")
            lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
            display_layout.addWidget(lbl, row, col)
            
            # Label für den Messwert
            value_lbl = QLabel("0.000")
            value_lbl.setStyleSheet(f"""
                color: {self.fg_color}; 
                font-weight: bold; 
                font-size: 12px;
                min-width: 60px;
            """)
            display_layout.addWidget(value_lbl, row, col + 1)
            
            # Einheit falls vorhanden
            if unit:
                unit_lbl = QLabel(unit)
                unit_lbl.setStyleSheet(f"color: {self.fg_color}; font-size: 11px;")
                display_layout.addWidget(unit_lbl, row, col + 2)

        # Spaltenabstände anpassen
        for i in range(9):  # 3 Spalten × 3 Widgets
            display_layout.setColumnStretch(i, 1 if i % 3 == 1 else 0)  # Nur Wert-Spalten dehnen

        # Referenzen aktualisieren
        self.voltage_label = display_layout.itemAtPosition(0, 1).widget()
        self.current_label = display_layout.itemAtPosition(0, 4).widget()
        self.phase_label = display_layout.itemAtPosition(0, 7).widget()
        self.time_label = display_layout.itemAtPosition(1, 1).widget()
        self.capacity_label = display_layout.itemAtPosition(1, 4).widget()
        self.charge_capacity_label = display_layout.itemAtPosition(1, 7).widget()
        self.efficiency_label = display_layout.itemAtPosition(2, 1).widget()
        self.cycle_label = display_layout.itemAtPosition(2, 4).widget()
        self.temp_label = display_layout.itemAtPosition(2, 7).widget()
        self.resistance_label = display_layout.itemAtPosition(3, 1).widget()
        self.power_label = display_layout.itemAtPosition(3, 4).widget()
        self.energy_label = display_layout.itemAtPosition(3, 7).widget()
        
        self.main_layout.addWidget(display_frame)
        
        # Control area
        controls_frame = QFrame()
        controls_frame.setFrameShape(QFrame.NoFrame)
        controls_layout = QHBoxLayout(controls_frame)
        controls_layout.setContentsMargins(0, 0, 0, 0)
        
        # Parameters frame
        params_frame = QFrame()
        params_frame.setFrameShape(QFrame.StyledPanel)
        params_frame.setStyleSheet(f"QFrame {{ border: 1px solid {self.accent_color}; border-radius: 5px; }}")
        params_layout = QGridLayout(params_frame)
        
        # Battery capacity
        self.capacity = 0.2
        self.capacity_label_input = QLabel("Battery Capacity (Ah):")
        self.capacity_label_input.setStyleSheet(f"color: {self.fg_color};")
        params_layout.addWidget(self.capacity_label_input, 0, 0)
        self.capacity_input = QLineEdit("0.2")
        self.capacity_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
        self.capacity_input.setFixedWidth(60)
        params_layout.addWidget(self.capacity_input, 0, 1)
        
        # Charge cutoff
        self.charge_cutoff = 1.43
        self.charge_cutoff_label = QLabel("Charge Cutoff (V):")
        self.charge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
        params_layout.addWidget(self.charge_cutoff_label, 1, 0)
        self.charge_cutoff_input = QLineEdit("1.43")
        self.charge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
        self.charge_cutoff_input.setFixedWidth(60)
        params_layout.addWidget(self.charge_cutoff_input, 1, 1)
        
        # Discharge cutoff
        self.discharge_cutoff = 0.9
        self.discharge_cutoff_label = QLabel("Discharge Cutoff (V):")
        self.discharge_cutoff_label.setStyleSheet(f"color: {self.fg_color};")
        params_layout.addWidget(self.discharge_cutoff_label, 2, 0)
        self.discharge_cutoff_input = QLineEdit("0.9")
        self.discharge_cutoff_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
        self.discharge_cutoff_input.setFixedWidth(60)
        params_layout.addWidget(self.discharge_cutoff_input, 2, 1)
        
        # Rest time
        self.rest_time = 0.25
        self.rest_time_label = QLabel("Rest Time (hours):")
        self.rest_time_label.setStyleSheet(f"color: {self.fg_color};")
        params_layout.addWidget(self.rest_time_label, 3, 0)
        self.rest_time_input = QLineEdit("0.25")
        self.rest_time_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
        self.rest_time_input.setFixedWidth(60)
        params_layout.addWidget(self.rest_time_input, 3, 1)
        
        # C-rate for test
        self.c_rate = 0.1
        self.c_rate_label = QLabel("Test C-rate:")
        self.c_rate_label.setStyleSheet(f"color: {self.fg_color};")
        params_layout.addWidget(self.c_rate_label, 0, 2)
        self.c_rate_input = QLineEdit("0.1")
        self.c_rate_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
        self.c_rate_input.setFixedWidth(40)
        params_layout.addWidget(self.c_rate_input, 0, 3)
        
        c_rate_note = QLabel("(e.g., 0.2 for C/5)")
        c_rate_note.setStyleSheet(f"color: {self.fg_color};")
        params_layout.addWidget(c_rate_note, 0, 4)
        
        controls_layout.addWidget(params_frame, 1)

        # Test conditions input
        self.test_conditions_label = QLabel("Test Conditions/Chemistry:")
        self.test_conditions_label.setStyleSheet(f"color: {self.fg_color};")
        params_layout.addWidget(self.test_conditions_label, 4, 0)
        self.test_conditions_input = QLineEdit("")
        self.test_conditions_input.setStyleSheet(f"background-color: #3B4252; color: {self.fg_color};")
        self.test_conditions_input.setFixedWidth(120)
        params_layout.addWidget(self.test_conditions_input, 4, 1)
        
        # Button frame
        button_frame = QFrame()
        button_frame.setFrameShape(QFrame.NoFrame)
        button_layout = QVBoxLayout(button_frame)
        button_layout.setContentsMargins(0, 0, 0, 0)
        
        self.start_button = QPushButton("START TEST")
        self.start_button.setStyleSheet(f"""
            QPushButton {{
                background-color: {self.accent_color};
                color: {self.fg_color};
                font-weight: bold;
                padding: 6px;
                border-radius: 4px;
            }}
            QPushButton:disabled {{
                background-color: #4C566A;
                color: #D8DEE9;
            }}
        """)
        self.start_button.clicked.connect(self.start_test)
        button_layout.addWidget(self.start_button)
        
        self.stop_button = QPushButton("STOP TEST")
        self.stop_button.setStyleSheet(f"""
            QPushButton {{
                background-color: {self.warning_color};
                color: {self.fg_color};
                font-weight: bold;
                padding: 6px;
                border-radius: 4px;
            }}
            QPushButton:disabled {{
                background-color: #4C566A;
                color: #D8DEE9;
            }}
        """)
        self.stop_button.clicked.connect(self.stop_test)
        self.stop_button.setEnabled(False)
        button_layout.addWidget(self.stop_button)
        
        # Continuous mode checkbox
        self.continuous_mode_check = QCheckBox("Continuous Mode")
        self.continuous_mode_check.setChecked(True)
        self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")
        button_layout.addWidget(self.continuous_mode_check)
        
        controls_layout.addWidget(button_frame)
        self.main_layout.addWidget(controls_frame)
        
        # Plot area
        self.setup_plot()
        
        # Status bar
        self.status_bar = self.statusBar()
        self.status_bar.setStyleSheet(f"color: {self.fg_color};")
        self.status_bar.showMessage("Ready")
        
        # Apply dark theme
        self.setStyleSheet(f"""
            QMainWindow {{
                background-color: {self.bg_color};
            }}
            QLabel {{
                color: {self.fg_color};
            }}
            QLineEdit {{
                background-color: #3B4252;
                color: {self.fg_color};
                border: 1px solid #4C566A;
                border-radius: 3px;
                padding: 2px;
            }}
        """)

    def setup_plot(self):
        """Configure the matplotlib plot"""
        self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color)
        self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
        self.ax = self.fig.add_subplot(111)
        self.ax.set_facecolor('#3B4252')

        # Set initial voltage range
        voltage_padding = 0.2
        min_voltage = max(0, 0.9 - voltage_padding)
        max_voltage = 1.43 + voltage_padding
        self.ax.set_ylim(min_voltage, max_voltage)
        
        # Voltage plot
        self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
        self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
        self.ax.tick_params(axis='y', labelcolor='#00BFFF')

        # Current plot (right axis)
        self.ax2 = self.ax.twinx()
        current_padding = 0.05
        test_current = 0.1 * 0.2  # Default values
        max_current = test_current * 1.5
        self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
        
        self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
        self.ax2.set_ylabel("Current (A)", color='r')
        self.ax2.tick_params(axis='y', labelcolor='r')

        self.ax.set_xlabel('Time (s)', color=self.fg_color)
        self.ax.set_title('Battery Test (CC)', color=self.fg_color)
        self.ax.tick_params(axis='x', colors=self.fg_color)
        self.ax.grid(True, color='#4C566A')

        # Position legends
        self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
        self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))

        # Embed plot
        self.canvas = FigureCanvas(self.fig)
        self.canvas.setStyleSheet(f"background-color: {self.bg_color};")
        self.main_layout.addWidget(self.canvas, 1)

    def init_device(self):
        """Initialize the ADALM1000 device with continuous measurement"""
        try:
            # Clean up any existing session
            if hasattr(self, 'session'):
                try:
                    self.session.end()
                    del self.session
                except:
                    pass
                    
            time.sleep(1)
            
            self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
            if not self.session.devices:
                raise Exception("No ADALM1000 detected - check connections")

            self.dev = self.session.devices[0]
            self.dev.channels['A'].mode = pysmu.Mode.HI_Z
            self.dev.channels['B'].mode = pysmu.Mode.HI_Z
            self.dev.channels['A'].constant(0)
            self.dev.channels['B'].constant(0)
            
            self.session.start(0)

            self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;")
            self.connection_label.setText("Connected")
            self.status_bar.showMessage("Device connected | Ready to measure")
            self.session_active = True
            self.start_button.setEnabled(True)

            # Start measurement thread
            self.measurement_thread = MeasurementThread(self.dev, self.interval)
            self.measurement_thread.update_signal.connect(self.update_measurements)
            self.measurement_thread.error_signal.connect(self.handle_device_error)
            
            # Start the QThread directly (no need for threading.Thread)
            self.measurement_thread.start()

        except Exception as e:
            self.handle_device_error(str(e))

    @pyqtSlot(float, float, float)
    def update_measurements(self, voltage, current, current_time):
        """Update measurements from the measurement thread"""
        self.time_data.append(current_time)
        self.voltage_data.append(voltage)
        self.current_data.append(current)
        
        # Update display
        self.voltage_label.setText(f"{voltage:.4f}")
        self.current_label.setText(f"{current:.4f}")
        self.time_label.setText(self.format_time(current_time))
        
        # Throttle plot updates to avoid recursive repaint
        now = time.time()
        if not hasattr(self, '_last_plot_update'):
            self._last_plot_update = 0
        
        if now - self._last_plot_update > 0.1:  # Update plot max 10 times per second
            self._last_plot_update = now
            QTimer.singleShot(0, self.update_plot)

    def update_status(self):
        """Update status information periodically"""
        if self.test_running:
            # Update capacity calculations if in test mode
            if self.measuring and self.time_data:
                current_time = time.time() - self.start_time
                delta_t = current_time - self.last_update_time
                self.last_update_time = current_time
                
                if self.test_phase == "Discharge":
                    current_current = abs(self.current_data[-1])
                    self.capacity_ah += current_current * delta_t / 3600
                    self.capacity_label.setText(f"{self.capacity_ah:.4f}")
                elif self.test_phase == "Charge":
                    current_current = abs(self.current_data[-1])
                    self.charge_capacity += current_current * delta_t / 3600
                    self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}")

    def start_test(self):
        """Start the full battery test cycle"""
        if not self.test_running:
            try:
                # Get parameters from UI
                self.capacity = float(self.capacity_input.text())
                self.charge_cutoff = float(self.charge_cutoff_input.text())
                self.discharge_cutoff = float(self.discharge_cutoff_input.text())
                self.rest_time = float(self.rest_time_input.text())
                self.c_rate = float(self.c_rate_input.text())
                
                # Validate inputs
                if self.capacity <= 0:
                    raise ValueError("Battery capacity must be positive")
                if self.charge_cutoff <= self.discharge_cutoff:
                    raise ValueError("Charge cutoff must be higher than discharge cutoff")
                if self.c_rate <= 0:
                    raise ValueError("C-rate must be positive")
                
                test_current = self.c_rate * self.capacity
                if test_current > 0.2:
                    raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
                
                # Clear previous data
                self.time_data.clear()
                self.voltage_data.clear()
                self.current_data.clear()
                self.phase_data.clear()
                self.capacity_ah = 0.0
                self.charge_capacity = 0.0
                self.coulomb_efficiency = 0.0
                self.cycle_count = 0

                # Reset plot with proper ranges
                self.reset_plot()
                
                # Generate filename and create log file
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}")
                self.create_cycle_log_file()
                
                # Start test
                self.test_running = True
                self.start_time = time.time()
                self.last_update_time = time.time()
                self.test_phase = "Initial Discharge"
                self.phase_label.setText(self.test_phase)
                
                self.start_button.setEnabled(False)
                self.stop_button.setEnabled(True)
                self.status_bar.showMessage(f"Test started | Discharging to {self.discharge_cutoff}V @ {test_current:.3f}A")
                
                # Start test sequence in a QThread
                self.test_sequence_thread = QThread()
                self.test_sequence_worker = TestSequenceWorker(
                    self.dev,
                    test_current,
                    self.charge_cutoff,
                    self.discharge_cutoff,
                    self.rest_time,
                    self.continuous_mode_check.isChecked(),
                    self  # Pass reference to main window for callbacks
                )
                self.test_sequence_worker.moveToThread(self.test_sequence_thread)
                
                # Connect signals
                self.test_sequence_worker.update_phase.connect(self.update_test_phase)
                self.test_sequence_worker.update_status.connect(self.status_bar.showMessage)
                self.test_sequence_worker.test_completed.connect(self.finalize_test)
                self.test_sequence_worker.error_occurred.connect(self.handle_test_error)
                self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit)
                self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater)
                self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater)
                
                # Start the thread and the worker's run method
                self.test_sequence_thread.start()
                QTimer.singleShot(0, self.test_sequence_worker.run)
                
                # Start capacity calculation timer if not already running
                if not self.status_timer.isActive():
                    self.status_timer.start(1000)
                
            except Exception as e:
                QMessageBox.critical(self, "Error", str(e))
                # Ensure buttons are in correct state if error occurs
                self.start_button.setEnabled(True)
                self.stop_button.setEnabled(False)

    def create_cycle_log_file(self):
        """Create a new log file for the current cycle"""
        try:
            # Close previous file if exists
            if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
                try:
                    self.current_cycle_file.close()
                except Exception as e:
                    print(f"Error closing previous log file: {e}")
            
            # Ensure log directory exists
            os.makedirs(self.log_dir, exist_ok=True)
            
            if not os.access(self.log_dir, os.W_OK):
                QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}")
                return False
            
            # Generate unique filename
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            self.filename = os.path.join(self.log_dir, f"battery_test_{timestamp}.csv")
            
            # Open new file
            try:
                self.current_cycle_file = open(self.filename, 'w', newline='')
                
                # Write header with test parameters
                test_current = self.c_rate * self.capacity
                test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
                
                self.current_cycle_file.write(f"# ADALM1000 Battery Test Log\n")
                self.current_cycle_file.write(f"# Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
                self.current_cycle_file.write(f"# Battery Capacity: {self.capacity} Ah\n")
                self.current_cycle_file.write(f"# Test Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n")
                self.current_cycle_file.write(f"# Charge Cutoff: {self.charge_cutoff} V\n")
                self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n")
                self.current_cycle_file.write(f"# Rest Time: {self.rest_time} hours\n")
                self.current_cycle_file.write(f"# Test Conditions/Chemistry: {test_conditions}\n")
                self.current_cycle_file.write("#\n")
                
                # Write data header
                self.log_writer = csv.writer(self.current_cycle_file)
                self.log_writer.writerow([
                    "Time(s)", "Voltage(V)", "Current(A)", "Phase", 
                    "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", 
                    "Coulomb_Eff(%)", "Cycle"
                ])
                self.log_buffer = []
                return True
            except Exception as e:
                QMessageBox.critical(self, "Error", f"Failed to create log file: {e}")
                return False
        except Exception as e:
            print(f"Error in create_cycle_log_file: {e}")
            return False

    def format_time(self, seconds):
        """Convert seconds to hh:mm:ss format"""
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        seconds = int(seconds % 60)
        return f"{hours:02d}:{minutes:02d}:{seconds:02d}"

    def stop_test(self):
        """Request immediate stop of the test"""
        if not self.test_running:
            return
            
        self.request_stop = True
        self.test_running = False
        self.measuring = False
        self.test_phase = "Idle"
        self.phase_label.setText(self.test_phase)
        
        if hasattr(self, 'dev'):
            try:
                self.dev.channels['A'].mode = pysmu.Mode.HI_Z
                self.dev.channels['A'].constant(0)
            except Exception as e:
                print(f"Error resetting device: {e}")
        
        self.time_data.clear()
        self.voltage_data.clear()
        self.current_data.clear()
        self.phase_data.clear()
        
        self.capacity_ah = 0.0
        self.charge_capacity = 0.0
        self.coulomb_efficiency = 0.0
        
        self.reset_plot()
        
        self.status_bar.showMessage("Test stopped - Ready for new test")
        self.stop_button.setEnabled(False)
        self.start_button.setEnabled(True)
        
        self.finalize_test()

    def finalize_test(self):
        """Final cleanup after test completes or is stopped"""
        self.measuring = False
        if hasattr(self, 'dev'):
            try:
                self.dev.channels['A'].constant(0)
            except Exception as e:
                print(f"Error resetting device: {e}")
        test_current = self.c_rate * self.capacity

        # Only try to close if file exists and is open
        if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
            try:
                if self.log_buffer:
                    self.log_writer.writerows(self.log_buffer)
                    self.log_buffer.clear()
                
                # Write test summary
                test_current = self.c_rate * self.capacity
                test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
                
                self.current_cycle_file.write("\n# TEST SUMMARY\n")
                self.current_cycle_file.write(f"# Test Parameters:\n")
                self.current_cycle_file.write(f"# - Battery Capacity: {self.capacity} Ah\n")
                self.current_cycle_file.write(f"# - Test Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n")
                self.current_cycle_file.write(f"# - Charge Cutoff: {self.charge_cutoff} V\n")
                self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n")
                self.current_cycle_file.write(f"# - Test Conditions: {test_conditions}\n")
                self.current_cycle_file.write(f"# Results:\n")
                self.current_cycle_file.write(f"# - Cycles Completed: {self.cycle_count}\n")
                self.current_cycle_file.write(f"# - Final Discharge Capacity: {self.capacity_ah:.4f} Ah\n")
                self.current_cycle_file.write(f"# - Final Charge Capacity: {self.charge_capacity:.4f} Ah\n")
                self.current_cycle_file.write(f"# - Coulombic Efficiency: {self.coulomb_efficiency:.1f}%\n")
                
                self.current_cycle_file.close()
            except Exception as e:
                print(f"Error closing log file: {e}")
            finally:
                self.current_cycle_file = None
        
        self.start_button.setEnabled(True)
        self.stop_button.setEnabled(False)
        self.request_stop = False
        
        message = (
            f"Test safely stopped after discharge phase | "
            f"Cycle {self.cycle_count} completed | "
            f"Final capacity: {self.capacity_ah:.3f}Ah"
        )
        self.status_bar.showMessage(message)
        
        QMessageBox.information(
            self,
            "Test Completed",
            f"Test was safely stopped after discharge phase.\n\n"
            f"Test Parameters:\n"
            f"- Capacity: {self.capacity} Ah\n"
            f"- Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n"
            f"- Charge Cutoff: {self.charge_cutoff} V\n"
            f"- Discharge Cutoff: {self.discharge_cutoff} V\n"
            f"- Conditions: {test_conditions}\n\n"
            f"Results:\n"
            f"- Cycles: {self.cycle_count}\n"
            f"- Discharge capacity: {self.capacity_ah:.3f}Ah\n"
            f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%"
        )

    def reset_plot(self):
        """Reset the plot completely for a new test"""
        self.line_voltage.set_data([], [])
        self.line_current.set_data([], [])
        
        self.time_data.clear()
        self.voltage_data.clear()
        self.current_data.clear()
        
        voltage_padding = 0.2
        min_voltage = max(0, self.discharge_cutoff - voltage_padding)
        max_voltage = self.charge_cutoff + voltage_padding
        self.ax.set_xlim(0, 10)
        self.ax.set_ylim(min_voltage, max_voltage)
        
        current_padding = 0.05
        test_current = self.c_rate * self.capacity
        max_current = test_current * 1.5
        self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
        
        self.canvas.draw()

    def write_cycle_summary(self):
        """Write cycle summary to the current cycle's log file"""
        if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file:
            return
            
        summary_line = (
            f"Cycle {self.cycle_count} Summary - "
            f"Discharge={self.capacity_ah:.4f}Ah, "
            f"Charge={self.charge_capacity:.4f}Ah, "
            f"Efficiency={self.coulomb_efficiency:.1f}%"
        )
        
        try:
            if self.log_buffer:
                self.log_writer.writerows(self.log_buffer)
                self.log_buffer.clear()
            self.current_cycle_file.write(summary_line + "\n")
            self.current_cycle_file.flush()
        except Exception as e:
            print(f"Error writing cycle summary: {e}")

    def update_plot(self):
        """More reliable plotting with better error handling"""
        try:
            # Create local copies safely
            with self.plot_mutex:
                if not self.time_data or not self.voltage_data or not self.current_data:
                    return
                    
                if len(self.time_data) != len(self.voltage_data) or len(self.time_data) != len(self.current_data):
                    # Find the minimum length to avoid mismatch
                    min_len = min(len(self.time_data), len(self.voltage_data), len(self.current_data))
                    x_data = np.array(self.time_data[-min_len:])
                    y1_data = np.array(self.voltage_data[-min_len:])
                    y2_data = np.array(self.current_data[-min_len:])
                else:
                    x_data = np.array(self.time_data)
                    y1_data = np.array(self.voltage_data)
                    y2_data = np.array(self.current_data)
            
            # Update plot data
            self.line_voltage.set_data(x_data, y1_data)
            self.line_current.set_data(x_data, y2_data)
            
            # Auto-scale when needed
            if len(x_data) > 0 and x_data[-1] > self.ax.get_xlim()[1] * 0.8:
                self.auto_scale_axes()
            
            # Force redraw
            self.canvas.draw_idle()
            
        except Exception as e:
            print(f"Plot error: {e}")
            # Reset plot on error
            self.line_voltage.set_data([], [])
            self.line_current.set_data([], [])
            self.canvas.draw_idle()

    def auto_scale_axes(self):
        """Auto-scale plot axes with appropriate padding and strict boundaries"""
        if not self.time_data:
            return
        
        min_time = 0
        max_time = self.time_data[-1]
        current_xlim = self.ax.get_xlim()
        
        if max_time > current_xlim[1] * 0.95:
            new_max = max_time * 1.05
            self.ax.set_xlim(min_time, new_max)
            self.ax2.set_xlim(min_time, new_max)
        
        voltage_padding = 0.2
        if self.voltage_data:
            min_voltage = max(0, min(self.voltage_data) - voltage_padding)
            max_voltage = min(5.0, max(self.voltage_data) + voltage_padding)
            current_ylim = self.ax.get_ylim()
            if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1):
                self.ax.set_ylim(min_voltage, max_voltage)
        
        current_padding = 0.05
        if self.current_data:
            min_current = max(-0.25, min(self.current_data) - current_padding)
            max_current = min(0.25, max(self.current_data) + current_padding)
            current_ylim2 = self.ax2.get_ylim()
            if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02):
                self.ax2.set_ylim(min_current, max_current)

    @pyqtSlot(str)
    def handle_device_error(self, error):
        """Handle device connection errors"""
        error_msg = str(error)
        print(f"Device error: {error_msg}")

        if hasattr(self, 'session'):
            try:
                if self.session_active:
                    self.session.end()
                del self.session
            except Exception as e:
                print(f"Error cleaning up session: {e}")

        self.status_light.setStyleSheet(f"background-color: red; border-radius: 10px;")
        self.connection_label.setText("Disconnected")
        self.status_bar.showMessage(f"Device error: {error_msg}")

        self.session_active = False
        self.test_running = False
        self.continuous_mode = False
        self.measuring = False
        
        self.start_button.setEnabled(False)
        self.stop_button.setEnabled(False)

        self.time_data.clear()
        self.voltage_data.clear()
        self.current_data.clear()
    
    @pyqtSlot(str)
    def update_test_phase(self, phase_text):
        """Update the test phase display"""
        self.test_phase = phase_text
        self.phase_label.setText(phase_text)
    
        # Update log if available
        if hasattr(self, 'log_buffer'):
            current_time = time.time() - self.start_time
            self.log_buffer.append([
                f"{current_time:.3f}",
                "",
                "",
                phase_text,
                f"{self.capacity_ah:.4f}",
                f"{self.charge_capacity:.4f}",
                f"{self.coulomb_efficiency:.1f}" if hasattr(self, 'coulomb_efficiency') else "0.0",
                f"{self.cycle_count}"
            ])
        
    @pyqtSlot(str)
    def handle_test_error(self, error_msg):
        """Handle errors from the test sequence with complete cleanup"""
        try:
            # 1. Notify user
            QMessageBox.critical(self, "Test Error", 
                            f"An error occurred:\n{error_msg}\n\nAttempting to recover...")
            
            # 2. Stop all operations
            self.stop_test()
            
            # 3. Reset UI elements
            if hasattr(self, 'line_voltage'):
                try:
                    self.line_voltage.set_data([], [])
                    self.line_current.set_data([], [])
                    self.ax.set_xlim(0, 1)
                    self.ax2.set_xlim(0, 1)
                    self.canvas.draw()
                except Exception as plot_error:
                    print(f"Plot reset error: {plot_error}")
            
            # 4. Update status
            self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...")
            self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;")
            
            # 5. Attempt recovery
            QTimer.singleShot(1000, self.attempt_reconnect)  # Delay before reconnect
            
        except Exception as e:
            print(f"Error in error handler: {e}")
            # Fallback - restart application?
            QMessageBox.critical(self, "Fatal Error", 
                            "The application needs to restart due to an unrecoverable error")
            QTimer.singleShot(1000, self.close)

    def attempt_reconnect(self):
        """Attempt to reconnect automatically"""
        QMessageBox.critical(
            self,
            "Device Connection Error",
            "Could not connect to ADALM1000\n\n"
            "1. Check USB cable connection\n"
            "2. The device will attempt to reconnect automatically"
        )
        
        QTimer.singleShot(1000, self.reconnect_device)

    def reconnect_device(self):
        """Reconnect the device with proper cleanup"""
        self.status_bar.showMessage("Attempting to reconnect...")
        
        if hasattr(self, 'session'):
            try:
                if self.session_active:
                    self.session.end()
                del self.session
            except:
                pass
                
        self.test_running = False
        self.continuous_mode = False
        self.measuring = False
        
        if hasattr(self, 'measurement_thread'):
            self.measurement_thread.stop()

        time.sleep(1.5)
        
        try:
            self.init_device()
            if self.session_active:
                self.status_bar.showMessage("Reconnected successfully")
                return
        except Exception as e:
            print(f"Reconnect failed: {e}")
            
        self.status_bar.showMessage("Reconnect failed - will retry...")
        QTimer.singleShot(2000, self.reconnect_device)

    def closeEvent(self, event):
        """Clean up on window close"""
        self.test_running = False
        self.measuring = False
        self.session_active = False
        
        # Stop measurement thread
        if hasattr(self, 'measurement_thread'):
            self.measurement_thread.stop()
        
        # Stop test sequence thread
        if hasattr(self, 'test_sequence_thread'):
            if hasattr(self, 'test_sequence_worker'):
                self.test_sequence_worker.stop()
            self.test_sequence_thread.quit()
            self.test_sequence_thread.wait(500)
        
        # Clean up device session
        if hasattr(self, 'session') and self.session:
            try:
                self.session.end()
            except Exception as e:
                print(f"Error ending session: {e}")

        event.accept()

if __name__ == "__main__":
    app = QApplication([])
    try:
        window = BatteryTester()
        window.show()
        app.exec_()
    except Exception as e:
        QMessageBox.critical(None, "Fatal Error", f"Application failed: {str(e)}") 
fixed by C
This commit is contained in:
Jan 2025-07-07 11:56:44 +02:00
parent c52779e104
commit 451afb1a23

View File

@ -836,6 +836,7 @@ class BatteryTester(QMainWindow):
self.dev.channels['A'].constant(0)
except Exception as e:
print(f"Error resetting device: {e}")
test_current = self.c_rate * self.capacity
# Only try to close if file exists and is open
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None: