From b5380e5a332abbc8031bc60eaf3cba5ee2a324ad Mon Sep 17 00:00:00 2001 From: Jan Date: Thu, 3 Jul 2025 17:40:03 +0200 Subject: [PATCH] MainCode/adalm1000_logger.py aktualisiert plot and tata works description overlapping test untested --- MainCode/adalm1000_logger.py | 432 +++++++++++++++++++---------------- 1 file changed, 239 insertions(+), 193 deletions(-) diff --git a/MainCode/adalm1000_logger.py b/MainCode/adalm1000_logger.py index 8ada27b..af1b068 100644 --- a/MainCode/adalm1000_logger.py +++ b/MainCode/adalm1000_logger.py @@ -10,6 +10,7 @@ matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure from collections import deque +from queue import Queue, Full, Empty from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog) @@ -19,7 +20,7 @@ import pysmu class DeviceDisconnectedError(Exception): pass -class MeasurementThread(QThread): +class MeasurementThread(QThread): update_signal = pyqtSignal(float, float, float) error_signal = pyqtSignal(str) @@ -27,15 +28,16 @@ class MeasurementThread(QThread): super().__init__() self.device = device self.interval = interval - self.running = False + self._running = False self.filter_window_size = 10 self.voltage_window = [] self.current_window = [] self.start_time = time.time() + self.measurement_queue = Queue(maxsize=1) def run(self): - self.running = True - while self.running: + self._running = True + while self._running: try: samples = self.device.read(self.filter_window_size, 500, True) if not samples: @@ -56,20 +58,128 @@ class MeasurementThread(QThread): current = np.mean(self.current_window) self.update_signal.emit(voltage, current, current_time) + + # Store the latest measurement in the queue + try: + self.measurement_queue.put_nowait((voltage, current)) + except Full: + pass + time.sleep(max(0.05, self.interval)) except Exception as e: - self.error_signal.emit(str(e)) - break + self.error_signal.emit(f"Read error: {str(e)}") + time.sleep(1) + continue def stop(self): - self.running = False + self._running = False + self.wait(500) + +class TestSequenceWorker(QObject): + finished = pyqtSignal() + update_phase = pyqtSignal(str) + update_status = pyqtSignal(str) + test_completed = pyqtSignal() + error_occurred = pyqtSignal(str) + + def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent): + super().__init__() + self.device = device + self.test_current = test_current + self.charge_cutoff = charge_cutoff + self.discharge_cutoff = discharge_cutoff + self.rest_time = rest_time * 3600 # Convert hours to seconds + self.continuous_mode = continuous_mode + self.parent = parent + self._running = True + self.voltage_timeout = 0.5 # seconds + + def get_latest_measurement(self): + """Thread-safe measurement reading with timeout""" + try: + return self.parent.measurement_thread.measurement_queue.get( + timeout=self.voltage_timeout + ) + except Empty: + return (None, None) # Return tuple for unpacking + + def charge_phase(self): + """Handle the battery charging phase""" + self.update_phase.emit("Charge") + self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.3f}A") + + self.device.channels['B'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].mode = pysmu.Mode.SIMV + self.device.channels['A'].constant(self.test_current) + + while self._running: + voltage, current = self.get_latest_measurement() + if voltage is None: + continue + + # Update parent's data for logging/display + with self.parent.plot_mutex: + if len(self.parent.voltage_data) > 0: + self.parent.voltage_data[-1] = voltage + self.parent.current_data[-1] = current + + if voltage >= self.charge_cutoff: + break + + time.sleep(0.1) + + self.device.channels['A'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].constant(0) + + def discharge_phase(self): + """Handle the battery discharging phase""" + self.update_phase.emit("Discharge") + self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.3f}A") + + self.device.channels['A'].mode = pysmu.Mode.SIMV + self.device.channels['A'].constant(-self.test_current) + + while self._running: + voltage, current = self.get_latest_measurement() + if voltage is None: + continue + + # Update parent's data for logging/display + with self.parent.plot_mutex: + if len(self.parent.voltage_data) > 0: + self.parent.voltage_data[-1] = voltage + self.parent.current_data[-1] = current + + if voltage <= self.discharge_cutoff: + break + + time.sleep(0.1) + + self.device.channels['A'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].constant(0) + + def rest_phase(self, phase_name): + """Handle rest period between phases""" + self.update_phase.emit(f"Resting ({phase_name})") + rest_end = time.time() + self.rest_time + + while time.time() < rest_end and self._running: + time_left = max(0, rest_end - time.time()) + self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min") + time.sleep(1) + + def stop(self): + """Request the thread to stop""" + self._running = False + self.device.channels['A'].mode = pysmu.Mode.HI_Z + self.device.channels['A'].constant(0) class BatteryTester(QMainWindow): def __init__(self): self.plot_mutex = threading.Lock() super().__init__() - + # Color scheme self.bg_color = "#2E3440" self.fg_color = "#D8DEE9" @@ -437,9 +547,14 @@ class BatteryTester(QMainWindow): self.current_label.setText(f"{current:.4f}") self.time_label.setText(self.format_time(current_time)) - # Update plot periodically - if len(self.time_data) % 10 == 0: # Update plot every 10 samples - self.update_plot() + # Throttle plot updates to avoid recursive repaint + now = time.time() + if not hasattr(self, '_last_plot_update'): + self._last_plot_update = 0 + + if now - self._last_plot_update > 0.1: # Update plot max 10 times per second + self._last_plot_update = now + QTimer.singleShot(0, self.update_plot) def update_status(self): """Update status information periodically""" @@ -478,11 +593,7 @@ class BatteryTester(QMainWindow): if self.c_rate <= 0: raise ValueError("C-rate must be positive") - self.continuous_mode = self.continuous_mode_check.isChecked() - self.measurement_start_time = time.time() - self.test_start_time = time.time() test_current = self.c_rate * self.capacity - if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") @@ -515,8 +626,28 @@ class BatteryTester(QMainWindow): self.stop_button.setEnabled(True) self.status_bar.showMessage(f"Test started | Discharging to {self.discharge_cutoff}V @ {test_current:.3f}A") - # Start test sequence in a new thread - self.test_sequence_thread = threading.Thread(target=self.run_test_sequence, daemon=True) + # Start test sequence in a QThread + self.test_sequence_thread = QThread() + self.test_sequence_worker = TestSequenceWorker( + self.dev, + test_current, + self.charge_cutoff, + self.discharge_cutoff, + self.rest_time, + self.continuous_mode_check.isChecked(), + self # Pass reference to main window for callbacks + ) + self.test_sequence_worker.moveToThread(self.test_sequence_thread) + + # Connect signals + self.test_sequence_worker.update_phase.connect(self.update_test_phase) + self.test_sequence_worker.update_status.connect(self.status_bar.showMessage) + self.test_sequence_worker.test_completed.connect(self.finalize_test) + self.test_sequence_worker.error_occurred.connect(self.handle_test_error) + self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit) + self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater) + self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater) + self.test_sequence_thread.start() except Exception as e: @@ -595,165 +726,6 @@ class BatteryTester(QMainWindow): self.finalize_test() - def run_test_sequence(self): - try: - test_current = self.c_rate * self.capacity - - while self.test_running and (self.continuous_mode or self.cycle_count == 0): - self.request_stop = False - self.cycle_count += 1 - self.cycle_label.setText(str(self.cycle_count)) - - self.create_cycle_log_file() - - # 1. Charge phase - self.test_phase = "Charge" - self.phase_label.setText(self.test_phase) - self.status_bar.showMessage(f"Charging to {self.charge_cutoff}V @ {test_current:.3f}A") - - self.measuring = True - self.dev.channels['B'].mode = pysmu.Mode.HI_Z - self.dev.channels['A'].mode = pysmu.Mode.SIMV - self.dev.channels['A'].constant(test_current) - self.charge_capacity = 0.0 - self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}") - target_voltage = self.charge_cutoff - self.last_update_time = time.time() - - while self.test_running and not self.request_stop: - if not self.voltage_data: - time.sleep(0.1) - continue - - current_voltage = self.voltage_data[-1] - measured_current = abs(self.current_data[-1]) - - # Log data - if hasattr(self, 'current_cycle_file'): - self.log_buffer.append([ - f"{time.time() - self.start_time:.3f}", - f"{current_voltage:.6f}", - f"{measured_current:.6f}", - self.test_phase, - f"{self.capacity_ah:.4f}", - f"{self.charge_capacity:.4f}", - f"{self.coulomb_efficiency:.1f}", - f"{self.cycle_count}" - ]) - - if len(self.log_buffer) >= 10: - self.log_writer.writerows(self.log_buffer) - self.log_buffer.clear() - - if current_voltage >= target_voltage or self.request_stop: - break - - time.sleep(0.1) - - if self.request_stop or not self.test_running: - break - - # 2. Rest period after charge - self.test_phase = "Resting (Post-Charge)" - self.phase_label.setText(self.test_phase) - self.measuring = False - self.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.dev.channels['A'].constant(0) - - rest_end_time = time.time() + (self.rest_time * 3600) - while time.time() < rest_end_time and self.test_running and not self.request_stop: - time_left = max(0, rest_end_time - time.time()) - self.status_bar.showMessage(f"Resting after charge | Time left: {time_left/60:.1f} min") - time.sleep(1) - - if self.request_stop or not self.test_running: - break - - # 3. Discharge phase - self.test_phase = "Discharge" - self.phase_label.setText(self.test_phase) - self.status_bar.showMessage(f"Discharging to {self.discharge_cutoff}V @ {test_current:.3f}A") - - self.measuring = True - self.dev.channels['A'].mode = pysmu.Mode.SIMV - self.dev.channels['A'].constant(-test_current) - self.capacity_ah = 0.0 - self.capacity_label.setText(f"{self.capacity_ah:.4f}") - self.last_update_time = time.time() - - while self.test_running and not self.request_stop: - if not self.current_data: - time.sleep(0.1) - continue - - current_voltage = self.voltage_data[-1] - current_current = abs(self.current_data[-1]) - - # Log data - if hasattr(self, 'current_cycle_file'): - self.log_buffer.append([ - f"{time.time() - self.start_time:.3f}", - f"{current_voltage:.6f}", - f"{current_current:.6f}", - self.test_phase, - f"{self.capacity_ah:.4f}", - f"{self.charge_capacity:.4f}", - f"{self.coulomb_efficiency:.1f}", - f"{self.cycle_count}" - ]) - - if len(self.log_buffer) >= 10: - self.log_writer.writerows(self.log_buffer) - self.log_buffer.clear() - - if current_voltage <= self.discharge_cutoff or self.request_stop: - break - - if not self.continuous_mode_check.isChecked(): - self.test_running = False - self.test_phase = "Idle" - self.phase_label.setText(self.test_phase) - break - - # 4. Rest period after discharge - if self.test_running and not self.request_stop: - self.test_phase = "Resting (Post-Discharge)" - self.phase_label.setText(self.test_phase) - self.measuring = False - self.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.dev.channels['A'].constant(0) - - rest_end_time = time.time() + (self.rest_time * 3600) - while time.time() < rest_end_time and self.test_running and not self.request_stop: - time_left = max(0, rest_end_time - time.time()) - self.status_bar.showMessage(f"Resting after discharge | Time left: {time_left/60:.1f} min") - time.sleep(1) - - # Calculate Coulomb efficiency - if not self.request_stop and self.charge_capacity > 0: - efficiency = (self.capacity_ah / self.charge_capacity) * 100 - self.coulomb_efficiency = efficiency - self.efficiency_label.setText(f"{efficiency:.1f}") - - self.status_bar.showMessage( - f"Cycle {self.cycle_count} complete | " - f"Discharge: {self.capacity_ah:.3f}Ah | " - f"Charge: {self.charge_capacity:.3f}Ah | " - f"Efficiency: {efficiency:.1f}%" - ) - - self.write_cycle_summary() - - if self.log_buffer: - self.log_writer.writerows(self.log_buffer) - self.log_buffer.clear() - - self.finalize_test() - - except Exception as e: - self.status_bar.showMessage(f"Test error: {str(e)}") - self.finalize_test() - def finalize_test(self): """Final cleanup after test completes or is stopped""" self.measuring = False @@ -839,14 +811,34 @@ class BatteryTester(QMainWindow): print(f"Error writing cycle summary: {e}") def update_plot(self): - """Optimized plot update with change detection""" - if not self.time_data: + """More reliable plotting with better error handling""" + if not self.time_data or len(self.time_data) != len(self.voltage_data): + print("Plot: No data or mismatched lengths") # Debug return + + try: + # Create local copies quickly + with self.plot_mutex: + x_data = np.array(self.time_data) + y1_data = np.array(self.voltage_data) + y2_data = np.array(self.current_data) - with self.plot_mutex: - self.line_voltage.set_data(self.time_data, self.voltage_data) - self.line_current.set_data(self.time_data, self.current_data) - self.auto_scale_axes() + # Update plot data + self.line_voltage.set_data(x_data, y1_data) + self.line_current.set_data(x_data, y2_data) + + # Only auto-scale when needed + if x_data[-1] > self.ax.get_xlim()[1] * 0.8: + self.auto_scale_axes() + + # Force redraw + self.canvas.draw_idle() + + except Exception as e: + print(f"Plot error: {e}") + # Reset plot on error + self.line_voltage.set_data([], []) + self.line_current.set_data([], []) self.canvas.draw_idle() def auto_scale_axes(self): @@ -908,15 +900,62 @@ class BatteryTester(QMainWindow): self.time_data.clear() self.voltage_data.clear() self.current_data.clear() + + @pyqtSlot(str) + def update_test_phase(self, phase_text): + """Update the test phase display""" + self.test_phase = phase_text + self.phase_label.setText(phase_text) + + # Update log if available + if hasattr(self, 'log_buffer'): + current_time = time.time() - self.start_time + self.log_buffer.append([ + f"{current_time:.3f}", + "", + "", + phase_text, + f"{self.capacity_ah:.4f}", + f"{self.charge_capacity:.4f}", + f"{self.coulomb_efficiency:.1f}" if hasattr(self, 'coulomb_efficiency') else "0.0", + f"{self.cycle_count}" + ]) - if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'): - self.line_voltage.set_data([], []) - self.line_current.set_data([], []) - self.ax.set_xlim(0, 1) - self.ax2.set_xlim(0, 1) - self.canvas.draw() - - self.attempt_reconnect() + @pyqtSlot(str) + def handle_test_error(self, error_msg): + """Handle errors from the test sequence with complete cleanup""" + try: + # 1. Notify user + QMessageBox.critical(self, "Test Error", + f"An error occurred:\n{error_msg}\n\nAttempting to recover...") + + # 2. Stop all operations + self.stop_test() + + # 3. Reset UI elements + if hasattr(self, 'line_voltage'): + try: + self.line_voltage.set_data([], []) + self.line_current.set_data([], []) + self.ax.set_xlim(0, 1) + self.ax2.set_xlim(0, 1) + self.canvas.draw() + except Exception as plot_error: + print(f"Plot reset error: {plot_error}") + + # 4. Update status + self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...") + self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;") + + # 5. Attempt recovery + QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect + + except Exception as e: + print(f"Error in error handler: {e}") + # Fallback - restart application? + QMessageBox.critical(self, "Fatal Error", + "The application needs to restart due to an unrecoverable error") + QTimer.singleShot(1000, self.close) def attempt_reconnect(self): """Attempt to reconnect automatically""" @@ -968,11 +1007,18 @@ class BatteryTester(QMainWindow): self.measuring = False self.session_active = False + # Stop measurement thread if hasattr(self, 'measurement_thread'): self.measurement_thread.stop() - self.measurement_thread.quit() - self.measurement_thread.wait(1000) # Wait up to 1 second - + + # Stop test sequence thread + if hasattr(self, 'test_sequence_thread'): + if hasattr(self, 'test_sequence_worker'): + self.test_sequence_worker.stop() + self.test_sequence_thread.quit() + self.test_sequence_thread.wait(500) + + # Clean up device session if hasattr(self, 'session') and self.session: try: self.session.end()