From c97256395d190cf2f8125b19be4bd690f029cc20 Mon Sep 17 00:00:00 2001 From: Jan Date: Tue, 29 Jul 2025 12:30:34 +0200 Subject: [PATCH] MainCode/adalm1000_logger.py aktualisiert MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Alles außer capacity sollte funzen (D) --- MainCode/adalm1000_logger.py | 288 +++++++++++++++++++++-------------- 1 file changed, 171 insertions(+), 117 deletions(-) diff --git a/MainCode/adalm1000_logger.py b/MainCode/adalm1000_logger.py index 20d2999..8076a1e 100644 --- a/MainCode/adalm1000_logger.py +++ b/MainCode/adalm1000_logger.py @@ -38,39 +38,21 @@ class MeasurementThread(QThread): self.current_direction = 1 # 1 for source, -1 for sink def run(self): - """Continuous measurement with proper validation""" + """Continuous measurement loop""" self._running = True - consecutive_errors = 0 - while self._running: try: samples = self.device.read(self.filter_window_size, 500, True) - - # Check for device disconnection if not samples: - consecutive_errors += 1 - if consecutive_errors > 3: - raise DeviceDisconnectedError("Keine Messwerte empfangen") - time.sleep(0.1) - continue - - consecutive_errors = 0 # Reset counter on successful read + raise DeviceDisconnectedError("No samples received") current_time = time.time() - self.start_time - raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B - # Strict voltage validation (0-5V range) - if raw_voltage < 0 or raw_voltage > 5.0: - raise ValueError(f"Ungültige Spannung: {raw_voltage}V (außerhalb 0-5V Bereich)") + # Get voltage from Channel B (HI_Z mode) and current from Channel A + raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage + raw_current = np.mean([s[0][1] for s in samples]) * self.current_direction # Channel A current with direction - # Current measurement with direction - raw_current = np.mean([s[0][1] for s in samples]) * self.current_direction - - # Current validation (-200mA to +200mA) - if not (-0.25 <= raw_current <= 0.25): - raise ValueError(f"Ungültiger Strom: {raw_current}A (außerhalb ±200mA Bereich)") - - # Apply filtering + # Update filter windows self.voltage_window.append(raw_voltage) self.current_window.append(raw_current) @@ -81,24 +63,26 @@ class MeasurementThread(QThread): voltage = np.mean(self.voltage_window) current = np.mean(self.current_window) - # Emit measurements + # Validate measurements + if voltage is None or not (-1.0 <= voltage <= 6.0): + raise ValueError(f"Invalid voltage: {voltage}V") + if not (-0.25 <= current <= 0.25): + raise ValueError(f"Invalid current: {current}A") + + # Emit update self.update_signal.emit(voltage, current, current_time) - # Store in queue + # Store measurement try: self.measurement_queue.put_nowait((voltage, current)) except Full: pass - + time.sleep(max(0.05, self.interval)) - except DeviceDisconnectedError as e: - self.error_signal.emit(f"Gerätefehler: {str(e)}") - time.sleep(1) - continue except Exception as e: - self.error_signal.emit(f"Messfehler: {str(e)}") - time.sleep(0.5) + self.error_signal.emit(f"Read error: {str(e)}") + time.sleep(1) continue def set_direction(self, direction): @@ -448,10 +432,21 @@ class BatteryTester(QMainWindow): os.makedirs(self.log_dir, exist_ok=True) # Data buffers - self.time_data = deque() - self.voltage_data = deque() - self.current_data = deque() + max_data_points = 36000 # Define this first + self.time_data = deque(maxlen=max_data_points) + self.voltage_data = deque(maxlen=max_data_points) + self.current_data = deque(maxlen=max_data_points) + self.max_points_to_keep = 10000 + self.display_time_data = deque(maxlen=self.max_points_to_keep) + self.display_voltage_data = deque(maxlen=self.max_points_to_keep) + self.display_current_data = deque(maxlen=self.max_points_to_keep) + self.aggregation_buffer = { + 'time': [], 'voltage': [], 'current': [], + 'count': 0, 'last_plot_time': 0 + } self.phase_data = deque() + self.downsample_factor = 1 # Initial kein Downsampling + self.downsample_counter = 0 # Initialize UI and device self.setup_ui() @@ -464,8 +459,8 @@ class BatteryTester(QMainWindow): # Status update timer self.status_timer = QTimer() - self.status_timer.timeout.connect(self.update_status) - self.status_timer.start(1000) # Update every second + self.status_timer.timeout.connect(self.update_status_and_plot) + self.status_timer.start(1000) #every second def setup_ui(self): """Configure the user interface""" @@ -785,23 +780,33 @@ class BatteryTester(QMainWindow): self.rest_time_input.show() self.continuous_mode_check.show() self.start_button.setText("START CYCLE TEST") + self.start_button.setEnabled(True) # Explicitly enable elif mode_name == "Discharge Test": self.discharge_cutoff_label.show() self.discharge_cutoff_input.show() self.start_button.setText("START DISCHARGE") + self.start_button.setEnabled(True) # Explicitly enable elif mode_name == "Charge Test": self.charge_cutoff_label.show() self.charge_cutoff_input.show() self.start_button.setText("START CHARGE") + self.start_button.setEnabled(True) # Explicitly enable elif mode_name == "Live Monitoring": self.record_button.show() self.start_button.setText("START MONITORING") - self.start_button.setEnabled(False) + # Only enable start button if device is connected + self.start_button.setEnabled(self.session_active) + # Reset measurement state + self.reset_test() + self.status_bar.showMessage(f"Mode changed to {mode_name}") def reset_test(self): """Reset test state without stopping measurement""" + # Reset Downsampling + self.downsample_factor = 1 + self.downsample_counter = 0 # Clear data buffers with self.plot_mutex: self.time_data.clear() @@ -908,55 +913,33 @@ class BatteryTester(QMainWindow): self.main_layout.addWidget(self.canvas, 1) def init_device(self): - """Initialize the ADALM1000 with proper connection checks""" + """Initialize the ADALM1000 device with continuous measurement""" try: - # Cleanup previous session + # Clean up any existing session if hasattr(self, 'session'): try: self.session.end() del self.session except: pass - - # Hardware reset delay + time.sleep(1) - - try: - self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) - if not self.session.devices: - raise Exception("Kein ADALM1000 erkannt - Verbindung prüfen") - except Exception as e: - if "resource busy" in str(e).lower(): - raise Exception("ADALM1000 wird bereits von einem anderen Programm verwendet") - raise + + self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) + if not self.session.devices: + raise Exception("No ADALM1000 detected - check connections") self.dev = self.session.devices[0] - - # Set safe defaults self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].constant(0) - # Connection test with actual measurement - try: - samples = self.dev.read(5, 500, True) # 5 samples for stability - if not samples: - raise DeviceDisconnectedError("Keine Messwerte empfangen") - - # Check if we're getting valid data (not just noise) - voltages = [s[1][0] for s in samples] # Channel B voltages - if all(abs(v) < 0.0001 for v in voltages): # 100µV threshold - self.status_bar.showMessage("Gerät verbunden, aber keine Batterie angeschlossen") - except Exception as e: - raise DeviceDisconnectedError(f"Verbindungstest fehlgeschlagen: {str(e)}") - self.session.start(0) - - # Update UI - self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") - self.connection_label.setText("Verbunden") - self.status_bar.showMessage("Gerät bereit - Batterie anschließen um zu messen") + + self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;") + self.connection_label.setText("Connected") + self.status_bar.showMessage("Device connected | Ready to measure") self.session_active = True self.start_button.setEnabled(True) @@ -975,13 +958,52 @@ class BatteryTester(QMainWindow): # Only store data if in a test or recording if not (self.test_running or self.record_button.isChecked()): return - + + # 1. Originale Daten immer vollständig speichern (für Berechnungen und Logging) with self.plot_mutex: self.time_data.append(current_time) self.voltage_data.append(voltage) self.current_data.append(current) - # Update display labels + # 2. Downsampling für die Anzeige + if not hasattr(self, 'aggregation_buffer'): + self.aggregation_buffer = { + 'time': [], 'voltage': [], 'current': [], + 'count': 0, 'last_plot_time': 0 + } + + self.aggregation_buffer['time'].append(current_time) + self.aggregation_buffer['voltage'].append(voltage) + self.aggregation_buffer['current'].append(current) + self.aggregation_buffer['count'] += 1 + + # Nur aggregieren wenn genug Daten oder Zeit vergangen + now = time.time() + if (self.aggregation_buffer['count'] >= self.downsample_factor or + now - self.aggregation_buffer['last_plot_time'] >= 1.0): + + # Berechne aggregierte Werte (Mittelwert) + agg_time = np.mean(self.aggregation_buffer['time']) + agg_voltage = np.mean(self.aggregation_buffer['voltage']) + agg_current = np.mean(self.aggregation_buffer['current']) + + # Für die Anzeige verwenden + if not hasattr(self, 'display_time_data'): + self.display_time_data = deque(maxlen=self.max_points_to_keep) + self.display_voltage_data = deque(maxlen=self.max_points_to_keep) + self.display_current_data = deque(maxlen=self.max_points_to_keep) + + self.display_time_data.append(agg_time) + self.display_voltage_data.append(agg_voltage) + self.display_current_data.append(agg_current) + + # Reset Buffer + self.aggregation_buffer = { + 'time': [], 'voltage': [], 'current': [], + 'count': 0, 'last_plot_time': now + } + + # 3. Originale Funktionalität für Berechnungen beibehalten self.voltage_label.setText(f"{voltage:.4f}") self.current_label.setText(f"{abs(current):.4f}") self.time_label.setText(self.format_time(current_time)) @@ -995,8 +1017,11 @@ class BatteryTester(QMainWindow): self.energy += power * delta_t / 3600 # Convert to Wh self.energy_label.setText(f"{self.energy:.4f}") - # Plot updates throttled to 10Hz - now = time.time() + # 4. Auto-Skalierung anpassen + if len(self.time_data) > self.max_points_to_keep * 1.5: + self.adjust_downsampling() + + # 5. Plot updates throttled to 10Hz if not hasattr(self, '_last_plot_update'): self._last_plot_update = 0 @@ -1006,6 +1031,48 @@ class BatteryTester(QMainWindow): except Exception as e: print(f"Error in update_measurements: {e}") + import traceback + traceback.print_exc() + # Versuche den Aggregationsbuffer zu retten + if hasattr(self, 'aggregation_buffer'): + agg_buffer = self.aggregation_buffer + if agg_buffer['count'] > 0: + try: + with self.plot_mutex: + if not hasattr(self, 'display_time_data'): + self.display_time_data = deque(maxlen=self.max_points_to_keep) + self.display_voltage_data = deque(maxlen=self.max_points_to_keep) + self.display_current_data = deque(maxlen=self.max_points_to_keep) + + self.display_time_data.append(np.mean(agg_buffer['time'])) + self.display_voltage_data.append(np.mean(agg_buffer['voltage'])) + self.display_current_data.append(np.mean(agg_buffer['current'])) + except: + pass + self.aggregation_buffer = { + 'time': [], 'voltage': [], 'current': [], + 'count': 0, 'last_plot_time': time.time() + } + + def adjust_downsampling(self): + current_length = len(self.time_data) + if current_length > self.max_points_to_keep * 1.5: + # Exponentiell erhöhen, aber max. 64 + new_factor = min(64, max(1, self.downsample_factor * 2)) + elif current_length < self.max_points_to_keep // 2: + # Halbieren, aber min. 1 + new_factor = max(1, self.downsample_factor // 2) + else: + return + + if new_factor != self.downsample_factor: + self.downsample_factor = new_factor + self.status_bar.showMessage( + f"Downsampling: Factor {self.downsample_factor}", 2000) + def update_status_and_plot(self): + """Combined status and plot update""" + self.update_status() + self.update_plot() def update_status(self): """Update status information periodically""" @@ -1818,45 +1885,38 @@ class BatteryTester(QMainWindow): # 5. Force immediate redraw self.canvas.draw() + def update_status_and_plot(self): + """Combined status and plot update""" + self.update_status() + self.update_plot() + def update_plot(self): - """More reliable plotting with better error handling""" + """More robust plotting with error handling""" try: - # Create local copies safely + # Create local copies of data safely with self.plot_mutex: - if not self.time_data or not self.voltage_data or not self.current_data: + if not self.display_time_data: return - if len(self.time_data) != len(self.voltage_data) or len(self.time_data) != len(self.current_data): - # Find the minimum length to avoid mismatch - min_len = min(len(self.time_data), len(self.voltage_data), len(self.current_data)) - x_data = np.array(self.time_data[-min_len:]) - y1_data = np.array(self.voltage_data[-min_len:]) - y2_data = np.array(self.current_data[-min_len:]) - else: - x_data = np.array(self.time_data) - y1_data = np.array(self.voltage_data) - y2_data = np.array(self.current_data) + x_data = np.array(self.display_time_data) + y1_data = np.array(self.display_voltage_data) + y2_data = np.array(self.display_current_data) # Update plot data self.line_voltage.set_data(x_data, y1_data) self.line_current.set_data(x_data, y2_data) # Auto-scale when needed - if len(x_data) > 0 and x_data[-1] > self.ax.get_xlim()[1] * 0.8: + if len(x_data) > 1: self.auto_scale_axes() # Force redraw self.canvas.draw_idle() except Exception as e: - print(f"Plot update error: {e}") - import traceback - traceback.print_exc() - # Reset plot on error - with self.plot_mutex: - self.line_voltage.set_data([], []) - self.line_current.set_data([], []) - self.canvas.draw_idle() + print(f"Plot error: {e}") + # Attempt to recover + self.reset_plot() def auto_scale_axes(self): """Auto-scale plot axes with appropriate padding and strict boundaries""" @@ -1890,39 +1950,33 @@ class BatteryTester(QMainWindow): @pyqtSlot(str) def handle_device_error(self, error): - """Handle errors with proper state management""" + """Handle device connection errors""" error_msg = str(error) - print(f"Fehler: {error_msg}") + print(f"Device error: {error_msg}") - # Special cases - if "resource busy" in error_msg.lower(): - error_msg = "ADALM1000 is already in use" - elif "no samples" in error_msg.lower(): - error_msg = "No measurments - Check connection" - - # Cleanup if hasattr(self, 'session'): try: if self.session_active: self.session.end() del self.session - except: - pass + except Exception as e: + print(f"Error cleaning up session: {e}") - # Update UI - self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") + self.status_light.setStyleSheet(f"background-color: red; border-radius: 10px;") self.connection_label.setText("Disconnected") - self.status_bar.showMessage(f"Error: {error_msg}") - - # Disable controls + self.status_bar.showMessage(f"Device error: {error_msg}") + self.session_active = False self.test_running = False + self.continuous_mode = False + self.measuring = False + self.start_button.setEnabled(False) self.stop_button.setEnabled(False) - # Attempt recovery for certain errors - if "no samples" in error_msg.lower() or "resource busy" in error_msg.lower(): - QTimer.singleShot(3000, self.reconnect_device) + self.time_data.clear() + self.voltage_data.clear() + self.current_data.clear() @pyqtSlot(str) def update_test_phase(self, phase_text):